This article was first published at www.yuque.com/17sing

version The date of note
1.0 2022.1.26 The article first

The foreword 0.

A while ago, my subgroup friends asked me, “Why does Flink have to go through so many graph transitions from our code to the executable state? What’s the good of that?” I did have the same doubts when I saw the design here early on. I was looking at something else and then turned the page after looking up some information. Encountered such a problem again now, might as well in this article to figure it out.

The source code of this article is based on Flink1.14.0.

1. Layered design

This picture is from Jark’s blog: Wuchong. me/blog/2016/0…

The above are Flink’s Graph layers, and we will uncover them one by one in the next part of the content to learn the meaning of their existence.

1.1 BatchAPI的OptimizedPlan

In this section, we see the DataSet in the process of converting from Plan to OptimizedPlan. To give the reader an idea, let’s explain some terms here:

  • DataSet: user-oriented batch PROCESSING API.
  • Plan: Describes the DataSource and how DataSink and Operation interact.
  • OptimizedPlan: An optimized execution plan.

Code entry:

|--ClientFrontend#main \-- parseAndRun \-- runApplication \-- getPackagedProgram \-- buildProgram \-- executeProgram |-- ClientUtils# executeProgram | - PackagedProgram# invokeInteractiveModeForExecution \ - callMainMethod / / | - calling user program entry ExecutionEnvironment# execute \ - executeAsync / / create the Plan | - PipelineExecutorFactory# execute | - EmbeddedExecutor# execute  \-- submitAndGetJobClientFuture |-- PipelineExecutorUtils#getJobGraph |-- FlinkPipelineTranslationUtil#getJobGraph |-- FlinkPipelineTranslator#translateToJobGraph (pipelinetranslator #translateToJobGraph) If it is StreamGraph directly convert JobGraph | - PlanTranslator# translateToJobGraph \ - compilePlanCopy the code

Let’s take a look at this code:

    private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
        Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
        OptimizedPlan optimizedPlan = optimizer.compile(plan);

        JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
        return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
    }
Copy the code

It’s very clear. From OptimizedPlan to JobGraph. OptimizedPlan: compile Optimizer#compile First look at the comment on the method signature:


    /**
     * Translates the given program to an OptimizedPlan. The optimized plan describes for each
     * operator which strategy to use (such as hash join versus sort-merge join), what data exchange
     * method to use (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined,
     * batch), where to cache intermediate results, etc,
     *
     * <p>The optimization happens in multiple phases:
     *
     * <ol>
     *   <li>Create optimizer dag implementation of the program.
     *       <p><tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute
     *       size estimates.
     *   <li>Compute interesting properties and auxiliary structures.
     *   <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting
     *       property computation (as opposed to the Database approaches), because we support plans
     *       that are not trees.
     * </ol>
     *
     * @param program The program to be translated.
     * @param postPasser The function to be used for post passing the optimizer's plan and setting
     *     the data type specific serialization routines.
     * @return The optimized plan.
     * @throws CompilerException Thrown, if the plan is invalid or the optimizer encountered an
     *     inconsistent situation during the compilation process.
     */
    private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser)
Copy the code

There are several steps mentioned to optimize:

  1. Create an optimized DAG for which the generated OptimizerNodes follow the PACT model and assign concurrency and computing resources.
  2. Generate some important attributes and auxiliary data structures.
  3. Enumerates all alternatives.

In the implementation of the method, a number of visitors are created to iterate over the program.

1.1.1 GraphCreatingVisitor

The first step is to create the GraphCreatingVisitor, which optimizes the original Plan by optimizing each operator into an OptimizerNode, which is connected to each other via a DagConnection. DagConnection is an edge model with source and target that represents the inputs and outputs of OptimizerNode. Do these things in the process:

  1. Create an OptimizerNode for each operator — a Node closer to the execution description (estimate the size of the data, where the data flow is split and merged, etc.)
  2. Connect them with a Channel
  3. Generate the following policies based on the suggestions: Which Operator policy to use to execute the operations: for example, Hash Join or Sort Merge Join; The data exchange policy between operators can be Local Pipe Forward, Shuffle, or Broadcast. The data exchange mode between operators is Pipelined or Batch.

1.1.2 IdAndEstimatesVisitor

As the name implies, an ID is generated for each operator and its data volume is estimated. For an implementation of an estimate, see OptimizerNode#computeOutputEstimates — this is an abstract function, and we can look at the implementation in DataSourceNode, which produces an estimate based on a number of attributes (such as row count, size) of the upstream data source. See, if we have a statistics object that can tell us a bit about the file. See, if we have a statistics object that can tell us a bit about the file.

1.1.3 UnionParallelismAndForwardEnforcer

This ensures the concurrency of UnionNode and downstream pairing to avoid data inaccuracies caused by incorrect data distribution (see github.com/apache/flin…

1.1.4 BranchesVisitor

Computs a DAG graph of downcursors that does not close. See its definition:


    /** * Description of an unclosed branch. An unclosed branch is when the data flow branched (one * operator's result is consumed by multiple targets), but these different branches (targets) * have not been joined together. */
    public static final class UnclosedBranchDescriptor {
Copy the code

1.1.5 InterestingPropertyVisitor

Estimate the cost based on Node attributes.

Estimation algorithm to see: node.com puteInterestingPropertiesForInputs

  • WorksetIterationNode
  • TwoInputNode
  • SingleInputNode
  • BulkIterationNode

A series of execution plans are then calculated based on cost:

        // the final step is now to generate the actual plan alternatives
        List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
Copy the code

In this case, OptimizerNode is optimized to PlanNode, which is the final optimized node type. PlanNode contains more properties of nodes, and nodes are connected by channels, which are also edge models. At the same time, the data exchange modes between nodes, ShipStrategyType and DataExchangeMode, are determined. ShipStrategyType represents the data transmission strategy between two nodes, such as whether to partition data, hash partition, range partition, etc. DataExchangeMode represents the DataExchangeMode between two nodes, which has PIPELINED and BATCH. It is the same as ExecutionMode, which determines whether DataExchangeMode is sent directly or dropped first.

1.1.6 PlanFinalizer.createFinalPlan

PlanFinalizer. CreateFinalPlan (). The rough realization is to add nodes to sources, sinks and allNodes, and possibly set the memory occupied by the task for each node.

1.1.7 BinaryUnionReplacer

As the name implies, the upstream operations that are also Union are replaced and merged together. In my opinion, this reduces the generation of nodes when the output is equivalent.

1.1.8 RangePartitionRewriter

When using range partitioning, you need to ensure that the data sets processed by each partition are balanced as much as possible to maximize the use of computing resources and reduce the execution time of jobs. To this end, the optimizer provides a range partitioning rewriter to optimize the partitioning strategy for range partitioning so that it allocates data as evenly as possible and avoids data skew.

If you want to distribute the data as evenly as possible, you must estimate the data source. But it is obviously impossible to read all the data for estimation. Here, Flink adopts the improved version of ReservoirSampling algorithm — see the paper, Optimal Random Sampling from Distributed Streams Revisited. In the code by the org. Apache. Flink. API. Java. From. ReservoirSamplerWithReplacement and org., apache. Flink. API. Java from. ReservoirSamp LerWithoutReplacement implementation.

It is worth noting that both Plan and OptimizerNode implement Visitable interfaces, which is a typical use of policy patterns. This makes the code very flexible, as noted in the comments — traversal is freely written.

package org.apache.flink.util;

import org.apache.flink.annotation.Internal;

/** * This interface marks types as visitable during a traversal. The central method <i>accept(...) </i> * contains the logic about how to invoke the supplied {@linkVisitor} on the visitable object, and * how to traverse further. * * <p>This concept makes it easy to implement for example a depth-first traversal of a tree or DAG * with different types of logic during the traversal. The <i>accept(...) </i> method calls the * visitor and then send the visitor to its children (or predecessors). Using different types of * visitors, different operations can be performed during the traversal, while writing the actual * traversal code only once. * *@see Visitor
 */
@Internal
public interface Visitable<T extends Visitable<T>> {

    /** * Contains the logic to invoke the visitor and continue the traversal. Typically invokes the * pre-visit method of the visitor, then sends the visitor to the children (or predecessors) and * then invokes the post-visit method. * * <p>A typical code  example is the following: * * <pre>{@code* public void accept(Visitor<Operator> visitor) { * boolean descend = visitor.preVisit(this); * if (descend) { * if (this.input ! = null) { * this.input.accept(visitor); * } * visitor.postVisit(this); * } * } * }</pre> * *@param visitor The visitor to be called with this object as the parameter.
     * @see Visitor#preVisit(Visitable)
     * @see Visitor#postVisit(Visitable)
     */
    void accept(Visitor<T> visitor);
}

Copy the code

1.2 StreamAPI的StreamGraph

Tectonic StreamGraph entry function is StreamGraphGenerator. The generate (). This function can be carried by the trigger program method StreamExecutionEnvironment. The execute () call. Just like OptimizedPla, StreamGraph is constructed on the Client side.

In this process, the pipeline is first transformed into a Transformation pipeline, and then mapped to a StreamGraph, which has nothing to do with the execution and is focused on expressing the logic of the computational process.

About the introduction of the Transformation, we can see community issue:issues.apache.org/jira/browse…

Transformation focuses on attributes that are internal to the framework, such as: Name, UID, bufferTimeout, Parallelism, outputType, soltSharingGroup, etc. In addition, there are physical and virtual transformations, which are related to the StreamGraph implementation at the next level.

StreamGraph has two core objects:

  • StreamNode: It can have multiple outputs or inputs. Transformation — Entity StreamNodes will eventually become object operators, and virtual StreamNodes will attach to StreamEdge.
  • StreamEdge: The edge of a StreamGraph used to connect two StreamNodes. As mentioned above, a StreamNode can have multiple outsides and insides. StreamEdge contains information about by-pass output, partitioner, field filter output (the same logic as selecting fields in SQL Select), and so on.

Specific conversion code in the org. Apache. Flink, streaming. API. Graph. StreamGraphGenerator, each Transformation has the corresponding Transformation logic:

    static {
        @SuppressWarnings("rawtypes")
        Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
                tmp = new HashMap<>();
        tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
        tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
        tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
        tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
        tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
        tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
        tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
        tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
        tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
        tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
        tmp.put(
                TimestampsAndWatermarksTransformation.class,
                new TimestampsAndWatermarksTransformationTranslator<>());
        tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
        tmp.put(
                KeyedBroadcastStateTransformation.class,
                new KeyedBroadcastStateTransformationTranslator<>());
        translatorMap = Collections.unmodifiableMap(tmp);
    }
Copy the code

1.3 Streaming batch one JobGraph

Code entry and section 1.1 is almost the same, class is ExecutionEnvironment at the entrance to the DataSet and the entry of the DataStream is StreamExecutionEnvironment. PlanTranslator becomes StreamGraphTranslator. Therefore, the StreamGraph to JobGraph conversion is also done on the Client side, mainly for optimization. One of the most important optimizations is Operator Chain, which combines operators allowed by conditions to avoid cross-thread and cross-network transfer.

Whether to enable OperationChain adjustments that can be displayed in the program.

So let’s see what a JobGraph is. First look at the notes:


/** * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts. * All programs from higher level APIs are transformed into JobGraphs. * * 

The JobGraph is a graph of vertices and intermediate results that are connected together to * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph * but inside certain special vertices that establish the feedback channel amongst themselves. * *

The JobGraph defines the job-wide configuration settings, while each vertex and intermediate * result define the characteristics of the concrete operation and intermediate data. * /

public class JobGraph implements Serializable { Copy the code

It’s a graph, consisting of vertices and intermediates. And it’s a low-level API, built for JobMaster — all high-level apis will be converted into JobGraph. The next objects we need to focus on are JobVertex, JobEdge, IntermediateDataSet respectively. The input of JobVertex is JobEdge and the output is IntermediateDataSet.

1.3.1 JoBVertex

Optimized streamNodes may be fused together to form a JobVertex, Namely a JobVertex contains one or more operator (interested students can see StreamingJobGraphGenerator# buildChainedInputsAndGetHeadInputs or read related Issue: Issues.apache.org/jira/browse…

1.3.2 JobEdge

JobEdge is the edge connecting the IntermediateDatSet and JobVertex, representing a data flow channel in JobGraph, upstream of which is the IntermediateDataSet, Downstream is JobVertex — data is passed to the target JobVertex by the IntermediateDataSet via the JobEdge.

Here, we focus on one of its members:


/**
 * A distribution pattern determines, which sub tasks of a producing task are connected to which
 * consuming sub tasks.
 *
 * <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected
 * in {@link EdgeManagerBuildUtil}
 */
public enum DistributionPattern {

    /** Each producing sub task is connected to each sub task of the consuming task. */
    ALL_TO_ALL,

    /** Each producing sub task is connected to one or more subtask(s) of the consuming task. */
    POINTWISE
}

Copy the code

This distribution mode directly affects the data connection relationship between tasks at execution: point-to-point connection or full connection (or broadcast).

1.3.3 IntermediateDataSet

An IntermediateDataSet is a logical structure used to represent the output of JobVertex, that is, the data set that will be produced by the operators contained in that JobVertex. Here we need to focus on ResultPartitionType:

  • Blocking: As the name implies. All upstream data processing, and then to the downstream processing. This data partition can be consumed multiple times or concurrently. This partition is not automatically destroyed, but is handed over to the scheduler for judgment.
  • BlokingPersistent: Similar to Blocking, but with a lifetime specified by the client. Call the API of JobMaster or ResourceManager to destroy it instead of being controlled by the scheduler.
  • Pipelined: flow switching mode. Can be used for both bounded and unbounded flows. Data of this type of partition can only be consumed once per consumer. And this partition can hold arbitrary data.
  • PipelinedBounded: Unlike Pipelined, this partition retains limited data, which does not delay data and checkpoints for too long. This applies to stream computing scenarios (note that there is no CheckpointBarrier in batch mode).
  • Pipelined_Approximate: 1.12 Is used to perform fast failover for a single task. Interested students can read the related issue:issues.apache.org/jira/browse…

Under different execution modes, the corresponding result partition type is different, which determines the mode of data exchange at the execution time.

The number of intermediatedatasets is the same as the number of StreamNode outsides corresponding to the JobVertext, which can be one or more.

1.4 ExecutionGraph

Once the JobManager receives the Submitted JobGraph and its dependent Jar from the Client, the job is scheduled to run, but JobGraph is still a logical graph that needs to be further transformed into a parallel, schedulable execution graph. This action is made – triggered by SchedulerBase JobMaster, practical action to DefaultExecutionGraphBuilder# buildGraph to do it. In these actions, ExecutionJobVertex (logical concepts) and ExecutionVertex corresponding to JobVertex are generated, With corresponding an IntermediateResult IntermediateDataSet (logical concept) and IntermediateResultPartition etc., the so-called parallelism will also be achieved by the above class.

I’m going to talk about ExecutionGraph in some detail, and some of the logical concepts involved, so I’ve drawn a diagram here for your reference.

1.4.1 ExecutionJobVertex与ExecutionVertex

ExecutionJobVertex corresponds to JobVertex in JobGraph. The object also contains a set of ExecutionVertices that match the parallelism of the StreamNodes contained in the JobVertex, as shown in the figure above. If the parallelism is N, then there will be N ExecutionVertices. So each instance of parallel execution is ExecutionVertex. An IntermediateResult of ExecutionVertex’s output is also built.

So ExecutionJobVertex is more of a logical concept.

1.4.2 IntermediaResult与IntermediaResuktParitition

An IntermediateResult represents the output of ExecutionJobVertex and corresponds to the IntermediateDataSet in JobGraph. This object is also a logical concept. Similarly, an ExecutionJobVertex can have multiple intermediate results, depending on how many outedges the current JobVertex has.

An intermediate result set containing multiple intermediate results partition IntermediateResultPartition, its number is equal to the Job Vertext concurrency, or called operator parallelism. 1 each IntermediateResultPartition said ExecutionVertex output.

1.4.3 Execution

ExecutionVertex corresponds to a Task at Runtime. ExecutionVerterx is wrapped as an Execution when it is actually executed.

On how to submit to JobGraph JobMaster is not the focus of this article, interested students can view the org. Apache. Flink. Runtime. The dispatcher. DispatcherGateway# submitJob related the call stack.

1.4.5 From JobGraph to ExecutionGraph

Several important concepts were introduced above. Let’s take a look at how ExecutionGraph is built. Main reference method for org. Apache. Flink. Runtime. Executiongraph. DefaultExecutionGraph# attachJobGraph.

First is to build ExecutionJobVertex (reference) the constructor, set its Solt, CoLocationGroup parallelism, sharing, and build IntermediaResult and IntermediaResuktParitition, Create ExecutionVertex based on concurrency and check the IntermediateResults for duplicate references. Finally, the sharable data sources are shard.

The second is to build the Edge (reference org. Apache. Flink. Runtime. Executiongraph. EdgeManagerBuildUtil# connectVertexToResult). The EdgeManager is created according to the DistributionPattern, and the ExecutionVertex and IntermediateResult are associated to establish a physical transmission channel for data exchange between tasks at runtime.

1.4.6 Appetizer: From ExecutionGraph to actual execution

When JobMaster generates The ExecutionGraph, it enters the job scheduling phase. This involves different scheduling policies, resource application, task distribution and Failover management. There’s a lot to cover, so I’ll cover it in another article. This curious classmates, can see first DefaultExecutionGraphDeploymentTest# setupScheduler, the inside of the code is relatively simple, can observe ExecutionGraph to the Scheduling process.

    private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int dop2)
            throws Exception {
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);

        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);

        DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();

        // execution graph that executes actions synchronously
        final SchedulerBase scheduler =
                SchedulerTestingUtils.newSchedulerBuilder(
                                JobGraphTestUtils.streamingJobGraph(v1, v2),
                                ComponentMainThreadExecutorServiceAdapter.forMainThread())
                        .setExecutionSlotAllocatorFactory(
                                SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory())
                        .setFutureExecutor(executorService)
                        .setBlobWriter(blobWriter)
                        .build();
        final ExecutionGraph eg = scheduler.getExecutionGraph();

        checkJobOffloaded((DefaultExecutionGraph) eg);

        // schedule, this triggers mock deployment
        scheduler.startScheduling();

        Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
        assertEquals(dop1 + dop2, executions.size());

        return scheduler;
    }
Copy the code

2. Summary

Through this paper, we understand the significance of the existence of each layer diagram:

  • StreamGraph vs. OptimizedPlan: Move from an external API to an internal API to generate the basic properties of Graph. In the case of batch processing, a series of optimizations are performed.
  • JobGraph: Stream batch unified Graph. Do some generic optimizations here, like the OperatorChain.
  • ExecutionGraph: An execution-level graph that is built with a lot of attention to execution details such as concurrency, Checkpoint configuration validity, monitoring point Settings, double-reference checking, sharable data sources for sharding, and so on.

Through the layering of the diagram, Flink puts different optimization items and inspection items into their appropriate level, which is also the embodiment of the single responsibility principle.