sequence
This article focuses on the split operation of Flink DataStream
The instance
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
returnoutput; }});Copy the code
- This example splits dataStream into two datastreAms, with one outputName set to even and the other outputName set to odd
DataStream.split
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStream.java
@Public
public class DataStream<T> {
//......
public SplitStream<T> split(OutputSelector<T> outputSelector) {
returnnew SplitStream<>(this, clean(outputSelector)); } / /... }Copy the code
- DataStream’s split operation takes the OutputSelector parameter and creates and returns a SplitStream
OutputSelector
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@PublicEvolving
public interface OutputSelector<OUT> extends Serializable {
Iterable<String> select(OUT value);
}
Copy the code
- OutputSelector defines the select method used to assign outputNames to an element
SplitStream
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/SplitStream.java
@PublicEvolving
public class SplitStream<OUT> extends DataStream<OUT> {
protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
}
public DataStream<OUT> select(String... outputNames) {
return selectOutput(outputNames);
}
private DataStream<OUT> selectOutput(String[] outputNames) {
for (String outName : outputNames) {
if (outName == null) {
throw new RuntimeException("Selected names must not be null");
}
}
SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
returnnew DataStream<OUT>(this.getExecutionEnvironment(), selectTransform); }}Copy the code
- SplitStream inherits DataStream. It defines a select method that can be used to select split DataStream based on outputNames. The select method creates a SelectTransformation
StreamGraphGenerator
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@Internal public class StreamGraphGenerator { //...... private Collection<Integer> transform(StreamTransformation<? > transform) {if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism // from theExecutionConfig. int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection
transformedIds; if (transform instanceof OneInputTransformation
) { transformedIds = transformOneInputTransform((OneInputTransformation
) transform); } else if (transform instanceof TwoInputTransformation
) { transformedIds = transformTwoInputTransform((TwoInputTransformation
) transform); } else if (transform instanceof SourceTransformation
) { transformedIds = transformSource((SourceTransformation
) transform); } else if (transform instanceof SinkTransformation
) { transformedIds = transformSink((SinkTransformation
) transform); } else if (transform instanceof UnionTransformation
) { transformedIds = transformUnion((UnionTransformation
) transform); } else if (transform instanceof SplitTransformation
) { transformedIds = transformSplit((SplitTransformation
) transform); } else if (transform instanceof SelectTransformation
) { transformedIds = transformSelect((SelectTransformation
) transform); } else if (transform instanceof FeedbackTransformation
) { transformedIds = transformFeedback((FeedbackTransformation
) transform); } else if (transform instanceof CoFeedbackTransformation
) { transformedIds = transformCoFeedback((CoFeedbackTransformation
) transform); } else if (transform instanceof PartitionTransformation
) { transformedIds = transformPartition((PartitionTransformation
) transform); } else if (transform instanceof SideOutputTransformation
) { transformedIds = transformSideOutput((SideOutputTransformation
) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (! alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() ! = null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() ! = null) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (transform.getMinResources() ! = null && transform.getPreferredResources() ! = null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; } private
Collection
transformSelect(SelectTransformation
select) { StreamTransformation
input = select.getInput(); Collection
resultIds = transform(input); // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(select)) { return alreadyTransformed.get(select); } List
virtualResultIds = new ArrayList<>(); for (int inputId : resultIds) { int virtualId = StreamTransformation.getNewNodeId(); streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames()); virtualResultIds.add(virtualId); } return virtualResultIds; } private
Collection
transformSplit(SplitTransformation
split) { StreamTransformation
input = split.getInput(); Collection
resultIds = transform(input); // the recursive transform call might have transformed this already if (alreadyTransformed.containsKey(split)) { return alreadyTransformed.get(split); } for (int inputId : resultIds) { streamGraph.addOutputSelector(inputId, split.getOutputSelector()); } return resultIds; } / /... }
Copy the code
- The Transform inside the StreamGraphGenerator handles both SelectTransformation and SplitTransformation accordingly
- The transformSelect method addVirtualSelectNode based on select.getSelectedNames()
- The transformSplit method adds an OutputSelector from split.getOutputSelector()
summary
- DataStream’s split operation takes the OutputSelector parameter and creates and returns a SplitStream
- OutputSelector defines the select method used to assign outputNames to an element
- SplitStream inherits DataStream and defines a select method that can be used to select split DataStream based on outputNames
doc
- DataStream Transformations