introduce

When a user executes a Flink task, the StreamGraph is first generated, which is a topology generated to represent the application based on code written by the user through the Stream API. This article focuses on the internal implementation of calling the Stream API and the StreamGraph generation process. The WordCount code used in this article is relatively simple and consists of five methods, addSource(), flatMap(), keyBy(), sum(), and addSink(), as shown below. Note: The version of Flink used in this article is 1.10.

public class WordCountDemo {
    private static final Logger log = LoggerFactory.getLogger(WordCountDemo.class);

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> sourceStream = env.addSource(new SourceFunction<String>() {
            private volatile boolean flag = true;
            private Random random = new Random();

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (flag) {
                    ctx.collect("name" + random.nextInt(10) + "," + "name" + random.nextInt(10));
                    Thread.sleep(1000); }}@Override
            public void cancel(a) {
                this.flag = false; }}); DataStream<Tuple2<String, Integer>> pairStream = sourceStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> ctx) throws Exception {
                String[] names = value.split(",");
                for (int i = 0; i < names.length; i++) {
                    ctx.collect(Tuple2.of(names[i], 1)); }}}); DataStream<Tuple2<String, Integer>> summedStream = pairStream .keyBy(0)
                .sum(1);

        summedStream.addSink(new SinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context context) throws Exception { log.info(value.toString()); }}); env.execute(); }}Copy the code

The source code to read

WordCount operator source code to read

Flink task execution, you first need to obtain StreamExecutionEnvironment execution environment env. Of course, this part of the content is not the focus of this paper, will not be specific. Mainly according to the execution environment for StreamExecutionEnvironment object, and set the default parallelism.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Copy the code

In order to read the source code better and more succinctly, we read and record the API of each call separately.

addSource()

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {

    if (typeInfo == null && function instanceof ResultTypeQueryable) {
	typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
    }
    if (typeInfo == null) {
	try {
	    typeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
					function.getClass(), 0.null.null);
	} catch (final InvalidTypesException e) {
	    typeInfo = (TypeInformation<OUT>) newMissingTypeInfo(sourceName, e); }}boolean isParallel = function instanceof ParallelSourceFunction;

    clean(function);

    finalStreamSource<OUT, ? > sourceOperator =new StreamSource<>(function);
    return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
Copy the code

The addSource() method does the following:

  • The data type of the output data in the current SourceFunction is obtained via the TypeExtractor utility class.
  • Based on the currentSourceFunctionIs it implemented inParallelSourceFunctionThe interface determines whether data needs to be sent in parallel.
  • Clear closed loop operation on function.
  • generateStreamSourceObject.
  • buildDataStreamSourceObject.

The above 1 and 3 points have been described in the previous article and will not be repeated here. The main analysis of 4, 5 points do. StreamSource inheritance AbstractUdfStreamOperator abstract class, it is the base class of the user-defined function operator. Initialization StreamSource object, actually will initialize AbstractUdfStreamOperator object, and specify the current operator in the flink topological link position (ChainingStrategy. HEAD), Will also save function to AbstractUdfStreamOperator object.

/ * * * {@link StreamOperator} for streaming sources.
 */
public class StreamSource<OUT.SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT.SRC> {...public StreamSource(SRC sourceFunction) {
		super(sourceFunction);

		this.chainingStrategy = ChainingStrategy.HEAD; }... }Copy the code
/**
 * This is used as the base class for operators that have a user-defined
 * function. This class handles the opening and closing of the user-defined functions,
 * as part of the operator life cycle.
 *
 * @param <OUT>
 *            The output type of the operator
 * @param <F>
 *            The type of the user function
 */
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT.F extends Function>
		extends AbstractStreamOperator<OUT>
		implements OutputTypeConfigurable<OUT> {.../** The user function. */
    protected final F userFunction;

    public AbstractUdfStreamOperator(F userFunction) {
	this.userFunction = requireNonNull(userFunction); . }... }Copy the code

Flink DataStreamSource said the starting point of topology, inheritance SingleOutputStreamOperator class, while SingleOutputStreamOperator classes inherit DataStream. To initialize a StreamSource object, a SourceTransformation object is generated based on the StreamSource object operator, outTypeInfo, parallelism and other information in the data stream. According to the StreamExecutionEnvironment object environment and generate DataStream SourceTransformation object object and return.

/**
 * The DataStreamSource represents the starting point of a DataStream.
 *
 * @param <T> Type of the elements in the DataStream created from the this source.
 */
@Public
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

    boolean isParallel;

    public DataStreamSource(StreamExecutionEnvironment environment, TypeInformation<T> outTypeInfo, StreamSource<T, ? > operator,boolean isParallel, String sourceName) {
	super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

	this.isParallel = isParallel;
	if(! isParallel) { setParallelism(1); }}... }Copy the code
/ * * * {@code SingleOutputStreamOperator} represents a user defined transformation
 * applied on a {@link DataStream} with one predefined output type.
 *
 * @param <T> The type of the elements in this stream.
 */
@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {...protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, Transformation<T> transformation) {
	super(environment, transformation); }... }Copy the code
public class DataStream<T> {

    protected final StreamExecutionEnvironment environment;

    protected final Transformation<T> transformation;

    /**
     * Create a new {@link DataStream} in the given execution environment with
     * partitioning set to forward by default.
     */
    public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {
	this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
	this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null."); }... }Copy the code

The SourceTransformation class inherits the PhysicalTransformation class, which inherits the Transformation class. The Transformation class creates DataStream objects. Transformation does not necessarily correspond to physical operations at run time. Some operations are just logical concepts, such as union, split/ Select, and Partitioning. When the SourceTransformation object is generated, The factory class that wraps the StreamOperator object operator corresponding to operator type is first obtained by SimpleOperatorFactory.of(operator) and assigned to the instance Oper of the SourceTransformation object AtorFactory, and finally generates a Transformation object. The Transformation object initializes the actual values of the current operator, such as ID, name, outputType, Parallelism, and slotSharingGroup.

public class SourceTransformation<T> extends PhysicalTransformation<T> {

    private final StreamOperatorFactory<T> operatorFactory;

    /**
     * Creates a new {@code SourceTransformation} from the given operator.
     *
     * @param name The name of the {@code SourceTransformation}, this will be shown in Visualizations and the Log
     * @param operator The {@code StreamSource} that is the operator of this Transformation
     * @param outputType The type of the elements produced by this {@code SourceTransformation}
     * @param parallelism The parallelism of this {@code SourceTransformation}
     */
    public SourceTransformation( String name, StreamSource<T, ? > operator, TypeInformation<T> outputType,int parallelism) {
            this(name, SimpleOperatorFactory.of(operator), outputType, parallelism);
    }

    public SourceTransformation(
        String name,
        StreamOperatorFactory<T> operatorFactory,
        TypeInformation<T> outputType,
        int parallelism) {
            super(name, outputType, parallelism);
            this.operatorFactory = operatorFactory; }... }Copy the code
/**
 * A {@link Transformation} that creates a physical operation. It enables setting {@link ChainingStrategy}.
 *
 * @param <T> The type of the elements that result from this {@code Transformation}
 * @see Transformation
 */
@Internal
public abstract class PhysicalTransformation<T> extends Transformation<T> {

    /**
     * Creates a new {@code Transformation} with the given name, output type and parallelism.
     *
     * @param name The name of the {@code Transformation}, this will be shown in Visualizations and the Log
     * @param outputType The output type of this {@code Transformation}
     * @param parallelism The parallelism of this {@code Transformation}
     */
    PhysicalTransformation(
        String name,
        TypeInformation<T> outputType,
        int parallelism) {
            super(name, outputType, parallelism); }... }Copy the code
/**
 * A {@code Transformation} represents the operation that creates a
 * DataStream. Every DataStream has an underlying
 * {@code Transformation} that is the origin of said DataStream.
 *
 * <p>API operations such as DataStream#map create
 * a tree of {@codeTransformation}s underneath. When the stream program is to be executed * this graph is translated to a StreamGraph using  StreamGraphGenerator. * * <p>A {@code Transformation} does not necessarily correspond to a physical operation
 * at runtime. Some operations are only logical concepts. Examples of this are union,
 * split/select data stream, partitioning.
 *
 * <p>The following graph of {@code Transformations}:
 * <pre>{@code* Source Source * + + * | | * * v v Rebalance HashPartition * + + * | | | * | * + -- -- -- -- -- - > the Union < -- -- -- -- -- - + + * * | Split * + * * v * | * v * Select Map * * + * v * + * Sink | * v * *} < / pre > < p > * * order to result in this graph of operations at runtime: * <pre>{@code
 *  Source              Source
 *    +                   +
 *    |                   |
 *    |                   |
 *    +------->Map<-------+
 *              +
 *              |
 *              v
 *             Sink
 * }</pre>
 *
 * <p>The information about partitioning, union, split/select end up being encoded in the edges
 * that connect the sources to the map operation.
 *
 * @param <T> The type of the elements that result from this {@code Transformation}
 */
@Internal
public abstract class Transformation<T> {.../**
     * Creates a new {@code Transformation} with the given name, output type and parallelism.
     *
     * @param name The name of the {@code Transformation}, this will be shown in Visualizations and the Log
     * @param outputType The output type of this {@code Transformation}
     * @param parallelism The parallelism of this {@code Transformation}
     */
    public Transformation(String name, TypeInformation<T> outputType, int parallelism) {
        this.id = getNewNodeId();
        this.name = Preconditions.checkNotNull(name);
        this.outputType = outputType;
        this.parallelism = parallelism;
        this.slotSharingGroup = null; }... }Copy the code

The simpleOperatorFactory.of (operator) object is relatively simple to implement. It obtains the corresponding instance of SimpleOperatorFactory based on the type of operator variable. In current WordCount tasks, for example, here will be initialized SimpleUdfStreamOperatorFactory instance, and the current operator variables respectively assigned to the operator and the operator of the parent variables.

public class SimpleOperatorFactory<OUT> implements StreamOperatorFactory<OUT> {

    private final StreamOperator<OUT> operator;

    /** * Create a SimpleOperatorFactory from existed StreamOperator. */
    @SuppressWarnings("unchecked")
    public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {
        if (operator == null) {
            return null;
        } else if (operator instanceof StreamSource &&
            ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {
            return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);
        } else if (operator instanceof StreamSink &&
            ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {    
            return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);
        } else if (operator instanceof AbstractUdfStreamOperator) { // Execute here
            return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);
        } else {
            return newSimpleOperatorFactory<>(operator); }}... }Copy the code
public class SimpleUdfStreamOperatorFactory<OUT> extends SimpleOperatorFactory<OUT> implements UdfStreamOperatorFactory<OUT> {

   private finalAbstractUdfStreamOperator<OUT, ? > operator;public SimpleUdfStreamOperatorFactory(AbstractUdfStreamOperator
       
         operator)
       ,> {
	super(operator); 
	this.operator = operator; }... }Copy the code

The distinction between AbstractUdfStreamOperator and SingleOutputStreamOperator? At first glance, the two variable names look similar, but each performs a different function. AbstractUdfStreamOperator class is mainly used to save userFunction variables, as well as userFunction variable called the open and close, and initializeState, snapshotState important method. SingleOutputStreamOperator inherit DataStream, flink many of the API return value is an instance of this class, such as flatMap, map, the process method, etc.

flatMap()

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

   TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);

   return flatMap(flatMapper, outType);
}

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
    return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
Copy the code

The flatMap() method does several things:

  • Get the data type of the output data in the current FlatMapFunction through the TypeExtractor utility class
  • Generate a StreamFlatMap object from the flatMapper variable.
  • calltransform()Method completes the assignmentAbstractUdfStreamOperator#userFunctionVariables, generationTransformationInstance, initializationDataStreamAnd other important work.

StreamFlatMap AbstractUdfStreamOperator class hierarchy and implement OneInputStreamOperator interface, Similar to initializing the StreamSource instance, assign the flatMapper variable to the userFunction variable and set chainingStrategy to ChainingStrategy.always, which will not be described here.

/**
 * A {@link StreamOperator} for executing {@link FlatMapFunction FlatMapFunctions}.
 */
@Internal
public class StreamFlatMap<IN.OUT>
		extends AbstractUdfStreamOperator<OUT.FlatMapFunction<IN.OUT>>
		implements OneInputStreamOperator<IN.OUT> {...public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
	super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; }... }Copy the code

Here we focus on the transform() method and follow the source code into the underlying API world. First, initialize an instance of the enclosing operator variable, SimpleOperatorFactory, as described in the addSource() section, and pass it to the doTransform() method as an argument.

public <R> SingleOutputStreamOperator<R> transform( String operatorName, TypeInformation
       
         outTypeInfo, OneInputStreamOperator
        
          operator)
        ,>
        {

    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
Copy the code

The doTransform() implementation structure is relatively clear and unambiguous. For the convenience of description, part of the explanation is directly explained in the source code. OneInputTransformation inherits the PhysicalTransformation class and indirectly inherits the Transformation class. Unlike other Transformations, the OneInputTransformation class contains an input field that specifies the upstream Transformation instance, which is assigned at initialization time. Then, according to the environment variables, resultTransform initialization SingleOutputStreamOperator instance, is the current generation DataStream flatMap operator. Underlying every operator will generate a transformation variables, StreamExecutionEnvironment instance provides data type is the ArrayList < transformation

protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation
       
         outTypeInfo, StreamOperatorFactory
        
          operatorFactory)
        
        {

    // Current transformation represents the transformation instance of the previous operator, which checks whether the data output type of the previous operator can be correctly inferred
    transformation.getOutputType();
    
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                        this.transformation,
                        operatorName,
                        operatorFactory,
                        outTypeInfo,
                        environment.getParallelism());

    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}
Copy the code
/**
 * This Transformation represents the application of a
 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input
 * {@link Transformation}.
 *
 * @param <IN> The type of the elements in the input {@code Transformation}
 * @param <OUT> The type of the elements that result from this {@code OneInputTransformation}
 */
@Internal
public class OneInputTransformation<IN.OUT> extends PhysicalTransformation<OUT> {

    private finalTransformation<IN> input; .public OneInputTransformation(
		Transformation<IN> input,
		String name,
		StreamOperatorFactory<OUT> operatorFactory,
		TypeInformation<OUT> outputType,
		int parallelism) {
	super(name, outputType, parallelism);
	this.input = input;
	this.operatorFactory = operatorFactory; }... }Copy the code

keyBy()

KeyBy () we focus on initializing PartitionTransformation instances, generating KeyedStream objects, and other aspects, Such as generating KeySelector, KeyGroupStreamPartitioner details associated with keyBy characteristic code, and generate the topology StreamGraph not necessarily linked, and we don’t do here, for this, You can refer to this article for Flink KeyBy source code analysis.

public KeyedStream<T, Tuple> keyBy(int. fields) {
    if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
	return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
    } else {
        return keyBy(newKeys.ExpressionKeys<>(fields, getType())); }}private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
    return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
				getType(), getExecutionConfig())));
}
Copy the code

KeyedStream inherits DataStream. When initializing the KeyedStream, you need to create a PartitionTransformation object and assign values to its input and partitioner fields.

public class KeyedStream<T.KEY> extends DataStream<T> {

    /** * The key selector that can get the key by which the stream if partitioned from the elements. */
    private final KeySelector<T, KEY> keySelector;

    /** The type of the key by which the stream is partitioned. */
    private final TypeInformation<KEY> keyType;

    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector}
     * to partition operator state by key.
     *
     * @param dataStream
     *            Base stream of data
     * @param keySelector
     *            Function for determining state partitions
     */
    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
        this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }

    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector}
     * to partition operator state by key.
     *
     * @param dataStream
     *            Base stream of data
     * @param keySelector
     *            Function for determining state partitions
     */
    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        this(
            dataStream,
            new PartitionTransformation<>(
                dataStream.getTransformation(),
                new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
                    keySelector,
                    keyType);
        }
    
        /**
         * Creates a new {@link KeyedStream} using the given {@link KeySelector} and {@link TypeInformation}
         * to partition operator state by key, where the partitioning is defined by a {@link Parti    tionTransformation}.
         *
         * @param stream
         *                Base stream of data
         * @param partiti    onTransformation
         *            Function that determines how the keys are distributed to downstream operator(s)
         * @param keySelector
         *            Function to extract keys from the base stream
         * @param keyType
         *            Defines the type of the extracted keys
         */
    @Internal
    KeyedStream(
        DataStream<T> stream,
        PartitionTransformation<T> partitionTransformation,
        KeySelector<T, KEY> keySelector,
        TypeInformation<KEY> keyType) {

        super(stream.getExecutionEnvironment(), partitionTransformation);
        this.keySelector = clean(keySelector);
        this.keyType = validateKeyType(keyType); }... }Copy the code
/**
 * This transformation represents a change of partitioning of the input elements.
 *
 * <p>This does not create a physical operation, it only affects how upstream operations are
 * connected to downstream operations.
 *
 * @param <T> The type of the elements that result from this {@code PartitionTransformation}
 */
@Internal
public class PartitionTransformation<T> extends Transformation<T> {

    private final Transformation<T> input;

    private final StreamPartitioner<T> partitioner;

    private finalShuffleMode shuffleMode; ./**
     * Creates a new {@code PartitionTransformation} from the given input and
     * {@link StreamPartitioner}.
     *
     * @param input The input {@code Transformation}
     * @param partitioner The {@code StreamPartitioner}
     * @param shuffleMode The {@code ShuffleMode}
     */
    public PartitionTransformation( Transformation
       
         input, StreamPartitioner
        
          partitioner, ShuffleMode shuffleMode)
        
        {
            super("Partition", input.getOutputType(), input.getParallelism());
            this.input = input;
            this.partitioner = partitioner;
            this.shuffleMode = checkNotNull(shuffleMode); }... }Copy the code

PartitionTransformation does not generate operator nodes like the SourceTransformation and OneInputTransformation used above. It only affects the connection mode of upstream operator to downstream operator and the possible situation of upstream data distribution to downstream partition.

sum()

The sum() method is a member of the KeyedStream method, meaning that it can only be followed by keyBy(). There are many other methods that are similar to this method, such as min(), Max (), and reduce(). We don’t care about the internal logic of these methods here. If necessary, readers can go to the source analysis of Flink Reduce, sum, aggregate and other methods for reading reference. The sum() method actually calls the aggregate() method inside. SumAggregator is the implementation of the data summation operation after aggregation. It inherits the AggregationFunction class and indirectly realizes the ReduceFunction interface.

public SingleOutputStreamOperator<T> sum(int positionToSum) {
    return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
Copy the code
public class SumAggregator<T> extends AggregationFunction<T> {...public SumAggregator(int pos, TypeInformation<T> typeInfo, ExecutionConfig config) {... }... }Copy the code

The aggregate() method first initializes the StreamGroupedReduce object, and then calls the Transform () method to get the DataStream object for the current operator. The transform() implementation is the same as described above and won’t be repeated here.

protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
    StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
			clean(aggregate), getType().createSerializer(getExecutionConfig()));
    return transform("Keyed Aggregation", getType(), operator);
}
Copy the code

StreamGroupedReduce inheritance AbstractUdfStreamOperator abstract classes, in constructing StreamGroupedReduce object, The aggregate variables assigned to AbstractUdfStreamOperator# userFunction variables, and the StreamGroupedReduce object member variable TypeSerializer serializer for assignment.

public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN.ReduceFunction<IN>>
		implements OneInputStreamOperator<IN.IN> {...private TypeSerializer<IN> serializer;

    public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer){
	super(reducer);
	this.serializer = serializer; }... }Copy the code

addSink()

The addSink() method is similar to the addSource() method in general. AddSink () mainly does the following things:

  • generateStreamSinkObject;
  • generateDataStreamSinkObject;
  • Put the current operator corresponding toTransformationInstance add toStreamExecutionEnvironmentYou have a whole set of transformations.
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

    // Current transformation represents the transformation instance of the previous operator, which checks whether the data output type of the previous operator can be correctly inferred
    transformation.getOutputType();

    if (sinkFunction instanceof InputTypeConfigurable) {
        ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
    }

    StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

    DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

    getExecutionEnvironment().addOperator(sink.getTransformation());
    return sink;
}
Copy the code

StreamSink AbstractUdfStreamOperator inheritance class, and the variable sinkFunction assigned to userFunction variables, And set the chainStrategy of the current operator to chainingStrategy.always.

public class StreamSink<IN> extends AbstractUdfStreamOperator<Object.SinkFunction<IN>>
		implements OneInputStreamOperator<IN.Object> {...public StreamSink(SinkFunction<IN> sinkFunction) {
	super(sinkFunction); chainingStrategy = ChainingStrategy.ALWAYS; }...Copy the code

Surprisingly, DataStreamSink does not inherit from the DataStream class. It only has a single member variable transformation and several base methods, such as name(), setParallelism(), and so on. The DataStreamSink object is actually initialized to assign its variable Transformation.

@Public
public class DataStreamSink<T> {

    private final SinkTransformation<T> transformation;

    protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
        this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism()); }... }Copy the code

SinkTransformation also inherits from the PhysicalTransformation class and indirectly from the Transformation abstract class. The logic is similar, so I won’t repeat it here.

public class SinkTransformation<T> extends PhysicalTransformation<Object> {

    private finalTransformation<T> input; .public SinkTransformation(
		Transformation<T> input,
		String name,
		StreamSink<T> operator,
		int parallelism) {
        this(input, name, SimpleOperatorFactory.of(operator), parallelism);
    }

    public SinkTransformation(
		Transformation<T> input,
		String name,
		StreamOperatorFactory<Object> operatorFactory,
		int parallelism) {
	super(name, TypeExtractor.getForClass(Object.class), parallelism);
	this.input = input;
	this.operatorFactory = operatorFactory; }... }Copy the code

A mind map of how the WordCount task calls the API

The figure is a summary of the operator source code, you can read the source code, combined with the figure to understand the source code, memory.

Generate StreamGraph process read

In this case, let’s first look at each operator in the WordCount taskTransformationInformation, and the connections among them. As can be seen from the following figure, the current task includes 5 operators,TransformationIDFrom 1 to 5, where,flatMap,sum,addSinkAt the bottom of thetransformationWill be saved to aStreamExecutionEnvironmentObject heldtransformationsVariable.        StreamGraphThe generation takes place on the user invocationexecute()In the method, passgetStreamGraph()Get the StreamGraph topology of the current task.

public JobExecutionResult execute(String jobName) throws Exception {
    Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

    return execute(getStreamGraph(jobName));
}
Copy the code

In the getStreamGraph() method, we can see that it is actually obtained by the StreamGraphGenerator#generate() method. In addition, when initializing the StreamGraphGenerator object with getStreamGraphGenerator(), You can assign important variables, such as transformations, to variables in the StreamGraphGenerator object. Streamgraphs are created around quick variables. Next, we’ll focus on the implementation details of the generate() method.

public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
    StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
    if (clearTransformations) {
        this.transformations.clear();
    }
    return streamGraph;
}
Copy the code
private StreamGraphGenerator getStreamGraphGenerator(a) {...return new StreamGraphGenerator(transformations, config, checkpointCfg)
		.setStateBackend(defaultStateBackend)
		.setChaining(isChainingEnabled)
		.setUserArtifacts(cacheFile)
		.setTimeCharacteristic(timeCharacteristic)
		.setDefaultBufferTimeout(bufferTimeout);
}
Copy the code

In the generate() method, you first generate the StreamGraph object, initialize the alreadyTransformed variable, the collection used to hold the already traversed transforms, The StreamGraph is built by iterating through transformations in a brace of transformations through the for loop.

public StreamGraph generate(a) {
    streamGraph = newStreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings); . alreadyTransformed =new HashMap<>();

    for(Transformation<? > transformation: transformations) { transform(transformation); }finalStreamGraph builtStreamGraph = streamGraph; .return builtStreamGraph;
}
Copy the code

The following code, which is the core code for generating StreamGraph, is also a bit of a puzzle. In the transform(transform) method, if the current transform variable has been traversed before, it returns directly, and then sets the parallelism and checks that the outputType variable contained in the current transform is healthy. Then, the transformXXXX() method is called based on the actual condition of the transform variable. The transformXXXX() method also calls the transform() method recursively.

private Collection<Integer> transform(Transformation
        transform) {
    // Because of the forward recursion, the current transform may have been traversed before tranform and recorded in the alreadyTransformed collection,
    // Therefore, we can return directly.
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }

    LOG.debug("Transforming " + transform);

    if (transform.getMaxParallelism() <= 0) {
        // Sets the maximum parallelism for the current operator node. }// If the data output type of the current transform is not inferred, an exception is thrown.
    transform.getOutputType();

    Collection<Integer> transformedIds;
    if (transform instanceofOneInputTransformation<? ,? >) { transformedIds = transformOneInputTransform((OneInputTransformation<? ,? >) transform); }else if (transform instanceofTwoInputTransformation<? ,? ,? >) { transformedIds = transformTwoInputTransform((TwoInputTransformation<? ,? ,? >) transform); }else if (transform instanceofSourceTransformation<? >) { transformedIds = transformSource((SourceTransformation<? >) transform); }else if (transform instanceofSinkTransformation<? >) { transformedIds = transformSink((SinkTransformation<? >) transform); }else if (transform instanceofUnionTransformation<? >) { transformedIds = transformUnion((UnionTransformation<? >) transform); }else if (transform instanceofSplitTransformation<? >) { transformedIds = transformSplit((SplitTransformation<? >) transform); }else if (transform instanceofSelectTransformation<? >) { transformedIds = transformSelect((SelectTransformation<? >) transform); }else if (transform instanceofFeedbackTransformation<? >) { transformedIds = transformFeedback((FeedbackTransformation<? >) transform); }else if (transform instanceofCoFeedbackTransformation<? >) { transformedIds = transformCoFeedback((CoFeedbackTransformation<? >) transform); }else if (transform instanceofPartitionTransformation<? >) { transformedIds = transformPartition((PartitionTransformation<? >) transform); }else if (transform instanceofSideOutputTransformation<? >) { transformedIds = transformSideOutput((SideOutputTransformation<? >) transform); }else {
        throw new IllegalStateException("Unknown transformation: " + transform);
    }

    // Add the current transform to the alreadyTransformed collection, or return it if the alreadyTransformed collection contains the current transform when this method is called.
    if(! alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); }// Do various configurations for the current StreamGraph node, such as uid, bufferTimeOut, etc., which are not extended here.return transformedIds;
}
Copy the code

First of all, OneInputTransformation{id=2, name=’Flat Map’, outputType=Java Tuple2

Parallelism = 8}). Perform transformOneInputTransform (transformation) method.
,>

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

    // As you can see, the StreamGraph is generated recursively iterating forward from the current node.
    // The return value is inputIds, which, as the name implies, records the subscript of the input transformation to the current transformation.
    Collection<Integer> inputIds = transform(transform.getInput());

    // Since the current node may have been traversed to SteamNode in the previous recursive iteration, we can return the id of the current transform directly
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }

    // Get the slotSharingGroup of the current transform, default is "default"
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    
    The addOperator() method generates the StreamNode node and sets the serializer for the input and output data types
    streamGraph.addOperator(transform.getId(),
                                slotSharingGroup,
                                transform.getCoLocationGroupKey(),
                                transform.getOperatorFactory(),
                                transform.getInputType(),
                                transform.getOutputType(),
                                transform.getName());

    if(transform.getStateKeySelector() ! =null) { TypeSerializer<? > keySerializer = transform.getStateKeyType().createSerializer(executionConfig); streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); }// Sets the parallelism and maximum parallelism for the current StreamNode
    intparallelism = transform.getParallelism() ! = ExecutionConfig.PARALLELISM_DEFAULT ? transform.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(transform.getId(), parallelism); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());// StreamNodes are connected by StreamEdge. The current step is to connect upstream and downstream StreamNodes via StreamEdge
    for (Integer inputId: inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
    }
    // Returns the transformId currently traversed
    return Collections.singleton(transform.getId());
}
Copy the code

The first operator is iterated recursively, calling the transformSource() method. This process is similar, except that the main difference is the addSource() method, which we’ll cover in more detail later.

private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
    String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
    
    // The addOperator() method is called underneath addSource()
    streamGraph.addSource(source.getId(),
                            slotSharingGroup,
                            source.getCoLocationGroupKey(),
                            source.getOperatorFactory(),
                            null,
                            source.getOutputType(),
                            "Source: " + source.getName());
                            
    if (source.getOperatorFactory() instanceof InputFormatOperatorFactory) {
        streamGraph.setInputFormat(source.getId(),
                                        ((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat());
    }
    
    intparallelism = source.getParallelism() ! = ExecutionConfig.PARALLELISM_DEFAULT ? source.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(source.getId(), parallelism); streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());return Collections.singleton(source.getId());
}

public <IN, OUT> void addSource(Integer vertexID,
                                    @Nullable String slotSharingGroup,
                                    @Nullable String coLocationGroup,
                                    StreamOperatorFactory<OUT> operatorFactory,
                                    TypeInformation<IN> inTypeInfo,
                                    TypeInformation<OUT> outTypeInfo,
                                    String operatorName) {
    addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
    sources.add(vertexID);
}
Copy the code

DetermineSlotSharingGroup () method is mainly used to get the current transform slotSharingGroup value, default is the default.

private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
    if(specifiedGroup ! =null) {
        return specifiedGroup;
    } else {
        String inputGroup = null;
        // If the user does not specify slotSharingGroup for the current transform, it is determined by the slot of the upstream transform.
        // 1) If the current transform is the source node, default is default
        // 2) If the current transform has only one upstream, default is default
        // 3) If the current transform has more than one upstream, if the upstream slotSharingGroup is the same,
        // Use the slotSharingGroup value, otherwise, default
        for (int id: inputIds) {
            String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
            if (inputGroup == null) {
                inputGroup = inputGroupCandidate;
            } else if(! inputGroup.equals(inputGroupCandidate)) {returnDEFAULT_SLOT_SHARING_GROUP; }}return inputGroup == null? DEFAULT_SLOT_SHARING_GROUP : inputGroup; }}Copy the code

Next, let’s focus on the implementation of the addOperator() method. This method generates StreamNode nodes and sets serializers for input and output data types.

public <IN, OUT> void addOperator(
                                    Integer vertexID,
                                    @Nullable String slotSharingGroup,
                                    @Nullable String coLocationGroup,
                                    StreamOperatorFactory<OUT> operatorFactory,
                                    TypeInformation<IN> inTypeInfo,
                                    TypeInformation<OUT> outTypeInfo,
                                    String operatorName) {
    
    The addNode method parameters are also inconsistent depending on the location of the current node. The Source node passes the SourceStreamTask,
    // Others are passed OneInputStreamTask.
    if (operatorFactory.isStreamSource()) {
        addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
    } else {
        addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
    }
    
    // Sets the serializer for the current StreamNode input and output data typesTypeSerializer<IN> inSerializer = inTypeInfo ! =null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null; TypeSerializer<OUT> outSerializer = outTypeInfo ! =null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;

    setSerializers(vertexID, inSerializer, null, outSerializer);

    if(operatorFactory.isOutputTypeConfigurable() && outTypeInfo ! =null) {
        // sets the output type which must be know at StreamGraph creation time
        operatorFactory.setOutputType(outTypeInfo, executionConfig);
    }

    if (operatorFactory.isInputTypeConfigurable()) {
        operatorFactory.setInputType(inTypeInfo, executionConfig);
    }

    if (LOG.isDebugEnabled()) {
        LOG.debug("Vertex: {}", vertexID); }}public void setSerializers(Integer vertexID, TypeSerializer
        in1, TypeSerializer
        in2, TypeSerializer
        out) {
    StreamNode vertex = getStreamNode(vertexID);
    vertex.setSerializerIn1(in1);
    vertex.setSerializerIn2(in2);
    vertex.setSerializerOut(out);
}
Copy the code

Following the previous step, we can see from the code below that the addNode() method is mainly used to generate the corresponding StreamNode node based on the Transform information.

protected StreamNode addNode(Integer vertexID,
                                    @Nullable String slotSharingGroup,
                                    @NullableString coLocationGroup, Class<? extends AbstractInvokable> vertexClass, StreamOperatorFactory<? > operatorFactory, String operatorName) {

    if (streamNodes.containsKey(vertexID)) {
        throw new RuntimeException("Duplicate vertexID " + vertexID);
    }

    StreamNode vertex = new StreamNode(
                                        vertexID,
                                        slotSharingGroup,
                                        coLocationGroup,
                                        operatorFactory,
                                        operatorName,
                                        newArrayList<OutputSelector<? >>(), vertexClass); streamNodes.put(vertexID, vertex);return vertex;
}
Copy the code

Streamnodes are connected by StreamEdge, and addEdge() is used to connect upstream and downstream StreamNodes, which actually calls the addEdgeInternal() method. The addEdgeInternal() method determines whether the current upstream node is a virtual node or a StreamNode and makes a recursive call.

public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
    addEdgeInternal(upStreamVertexID,
                        downStreamVertexID,
                        typeNumber,
                        null.new ArrayList<String>(),
                        null.null);
}

private void addEdgeInternal(Integer upStreamVertexID,
                                Integer downStreamVertexID,
                                inttypeNumber, StreamPartitioner<? > partitioner, List<String> outputNames, OutputTag outputTag, ShuffleMode shuffleMode) {
    // There are three types of virtual nodes. We currently only introduce virtualPartitionNode, the other two are similar, so we will not introduce too much.
    if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
        if (outputTag == null) {
            outputTag = virtualSideOutputNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
    } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
        if (outputNames.isEmpty()) {
        // selections that happen downstream override earlier selections
            outputNames = virtualSelectNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
        // Get the upstream of the virtual node and connect the upstream and downstream streamNodes of the virtual node by recursively calling addEdgeInternal().
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
        if (partitioner == null) {
            partitioner = virtualPartitionNodes.get(virtualId).f1;
        }
        shuffleMode = virtualPartitionNodes.get(virtualId).f2;
        // In this case, upStreamVertexID = 6, downStreamVertexID = 4, recursively call addEdgeInternal,
        StreamNode 2 and StreamNode 4 are actually connected.
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    } else {
        // Get upstream and downstream StreamNodes according to vertexID
        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);
        
        // Obviously, during StreamGraph generation, the generation strategy between StreamNodes is determined
        // 1. Parallelism is the same between upstream and downstream. Partitioner is ForwardPartitioner
        // If the parallelism is inconsistent, the partitioner is RebalacePartitioner
        / / 3, the HASH
        if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            partitioner = new ForwardPartitioner<Object>();
        } else if (partitioner == null) {
            partitioner = new RebalancePartitioner<Object>();
        }

        if (partitioner instanceof ForwardPartitioner) {
            if(upstreamNode.getParallelism() ! = downstreamNode.getParallelism()) {throw new UnsupportedOperationException("Forward partitioning does not allow " +
                    "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                    ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                    " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); }}if (shuffleMode == null) {
            shuffleMode = ShuffleMode.UNDEFINED;
        }
        // Connect upstream and downstream StreamNodes via StreamEdge
        StreamEdge edge = newStreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode); getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); }}Copy the code

After the above code, we can know that the addSource and flatMap operators corresponding to the StreamNode has been formed, and through the StreamEdge object to establish a connection between the two operators. Next, The second transformation(OneInputTransformation{id=4, name=’Keyed Aggregation’, outputType=Java Tuple2

, Parallelism = 8}). Again, it moves forward through the third operator keyBy, which executes the transformPartition() method. For the Transformation instance generated by the keyBy operator, we create a virtual node.
,>

private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
    // Get the PartitionTransformation upstream Transformation
    Transformation<T> input = partition.getInput();
    List<Integer> resultIds = new ArrayList<>();
    // Again, walk through the transform to get what transformedIds upstream of the current transform are
    Collection<Integer> transformedIds = transform(input);
    for (Integer transformedId: transformedIds) {
        // For each upstream transformation, a VirtualPartitionNode record is generated. Of course VirtualPartitionNode is not a concrete class,
        // It is just a Map element that contains the current virtual ID, upstream TransformationID, and Partitioner and is stored in the virtualPartitionNodes collection.
        int virtualId = Transformation.getNewNodeId();
        streamGraph.addVirtualPartitionNode(
                        transformedId, virtualId, partition.getPartitioner(),partition.getShuffleMode());
        resultIds.add(virtualId);
    }

    return resultIds;
}

// private Map
      
       , ShuffleMode>> virtualPartitionNodes;
      ,>
public void addVirtualPartitionNode( Integer originalId, Integer virtualId, StreamPartitioner
        partitioner, ShuffleMode shuffleMode) {

    if (virtualPartitionNodes.containsKey(virtualId)) {
        throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
    }

    virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, shuffleMode));
}
Copy the code

Through the second transformation, we can see that the keyBy transformation does not forma StreamNode, Instead, a VirtualPartitionNode record is formed that contains information about virtualId, upstream transformationId, Partitioner, and so on. Next, The last transformation (SinkTransformation{id=5, name=’Unnamed’, outputType=GenericType< java.lang.object >, Parallelism = 12}). Currently SinkTransformation calls the transformSink() method.

private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
    // Recursively iterates over the previous transform, of course, returning the previous transformId
    Collection<Integer> inputIds = transform(sink.getInput());
    
    String slotSharingGroup = determineSlotSharingGroup(sink.getSlotSharingGroup(), inputIds);
    
    // Underneath the addSink method is the addOperator method, which generates the StreamNode node and sets the serializer for the input and output data types
    streamGraph.addSink(sink.getId(),
                        slotSharingGroup,
                        sink.getCoLocationGroupKey(),
                        sink.getOperatorFactory(),
                        sink.getInput().getOutputType(),
                        null."Sink: " + sink.getName());

    StreamOperatorFactory operatorFactory = sink.getOperatorFactory();
    if (operatorFactory instanceof OutputFormatOperatorFactory) {
        streamGraph.setOutputFormat(sink.getId(), ((OutputFormatOperatorFactory) operatorFactory).getOutputFormat());
    }

    intparallelism = sink.getParallelism() ! = ExecutionConfig.PARALLELISM_DEFAULT ? sink.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(sink.getId(), parallelism); streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism());// Use StreamEdge to connect the current StreamNode to the upstream StreamNode
    for (Integer inputId: inputIds) {
        streamGraph.addEdge(inputId,
                            sink.getId(),
                            0);
    }

    if(sink.getStateKeySelector() ! =null) { TypeSerializer<? > keySerializer = sink.getStateKeyType().createSerializer(executionConfig); streamGraph.setOneInputStateKey(sink.getId(), sink.getStateKeySelector(), keySerializer); }return Collections.emptyList();
}
Copy the code

After the above code, we can get the StreamGraph of the current WordeCount.