[source code analysis] Look at the spread process of Flink Watermark from source code
0 x00 the
Through source code analysis, this paper will lead you to be familiar with the propagation process of Flink Watermark, and by the way, you can have a general grasp of the overall logic of Flink.
Due to the limited space, it is divided into two parts. This is the first part, which introduces the overall propagation process.
0 x01 an overview
From a static perspective, Watermarks is the core concept for streaming computing; Dynamically, Watermarks permeates the entire stream handler. Therefore, in order to explain the spread of Watermarks, it is necessary to understand many modules/concepts of Flink, involving almost every stage. I will first explain the concepts and then explain them from the following sections based on an example code: program logic/computational graph model/program execution. Finally, detailed Flink source code analysis (slightly lengthy, optional reading).
0x02 Related Concepts
Flow computation is abstracted into four questions, what, Where, when, and how.
Window deals with where, dividing unbounded data into bounded data.
When is the window’s data evaluated? The way to solve this problem is with watermark and trigger, where watermark marks the integrity of the window. Trigger is used to design window data trigger conditions.
1. Out-of-order processing
The out-of-order problem is generally associated with event time. For process time of a streaming processing system, there is no out-of-order problem. So the following watermark/allowedLateness also only in the event the time as the main time to take effect.
In Flink, the out-of-order dependent watermark+window+trigger is global processing. At the same time, Flink also provides the allowedLateness method for Windows, which allows out-of-order to a greater extent and belongs to local processing.
Watermark is global, not just for Window computation, whereas allowedLateness allows a particular window function to control its own strategy for processing delayed data. AllowedLateness is an attribute of the window function.
2. Watermark
Watermark is a mechanism used in streaming systems to solve the problem of out-of-order data in streaming systems. It is used to mark what watermark data is currently processed, which means that data earlier than this watermark will be discarded. This allows the engine to automatically track the current event time in the data and attempt to clean up the old state accordingly.
Watermarking indicates how long ago the data will not be updated. You can define Watermarking for the query by specifying the event time column and predict the delay of the data based on the event time. That is, every time the window slides, a Watermarking calculation is performed. When the event time of a set of data or newly received data is less than Watermarking, the data is not updated and the state of the set of data is not maintained in memory.
To put it another way, data lagging within the threshold will be aggregated, but data arriving later than the threshold (whose actual time is less than watermark) will be discarded.
Watermark flows through the flow as a normal message, just like the data itself.
3. Trigger
Trigger specifies the conditions under which the window calculation is triggered, based on the time at which the data is being processed and the specific attributes of the event. The general implementation of trigger is that when the watermark is in a certain time condition or the window data reaches a certain condition, the window data starts to calculate.
Each window allocator has a default Trigger. If the default Trigger doesn’t meet your needs, you can specify a custom Trigger (). The Flink Trigger interface has the following methods that allow Trigger to react to different events:
* onElement(): This method is called for every element that enters the window. * onEventTime(): is called when the event timer is triggered. * onProcessingTime(): called when the processing time timer is triggered. * onMerge(): Stateful triggers are related and merge the state of two triggers when their corresponding Windows merge, for example using a session window. * clear(): This method performs a window deletion operation.Copy the code
Each trigger is to recalculate the newly added data and relevant Windows and output them. There are three output modes: Complete, Append and Update:
-
Complete mode: Result Table outputs all the results of the recalculated window. This means that in this mode, every time a new input is read, output will output the results of all Windows in the resulttable.
-
Append Mode (default) : Only rows added to the Result Table since the last trigger are output. Since only newly added rows are output, this pattern is not suitable if old data is changed. The updated window does not output, otherwise the key in the external memory will be heavy.
-
Update mode: Any Row that is updated is output, which is an enhanced version of Append mode. And is the same key in external storage update, rather than append, need external storage is able to KV operation! Only new and updated Windows will be output.
The streaming framework stores a result table for window data.
4. allowedLateness
Flink uses watermark as well as Windows and triggers to handle out-of-order events based on event time, but what about “late Element”?
Some may ask, what is the difference between an out-of-order element and a late element? Isn’t it all the same? The answer is the same thing, it’s a concept created to deal with out-of-order problems. The differences can be summarized as follows:
- The watermark mechanism was used to process out-of-order problems, which was the first layer of protection, the global protection, and the out-of-order problem was solved by the usual way.
- To deal with out-of-order problems through the allowedLateness mechanism on the window belongs to the second layer of protection, which belongs to the protection of a particular window operator. The problem of late element refers to this kind of problem.
By default, when the watermark passes through end-of-window, the data is deleted when the previous data arrives. To avoid some late data being deleted, hence the concept of allowedLateness.
To put it simply, allowedLateness refers to event time. After watermark exceeds end-of-window, a period of time (also measured by event time) is allowed to wait for the arrival of the previous data, so that the data can be processed again.
5. Process the message process
- The format of the key is key + window, and the value is the data corresponding to the key and window.
- Register a timer with the data structure [key, window, window boundary-1] and place the timer in the collection.
- When the WindowOperator receives watermark, it fetches any timer smaller than watermark in the collection and triggers its window. In the process of triggering, data corresponding to key and window in state will be taken out, which will go through the serialization process and sent to WindowFunction for calculation.
- Data is sent to WindowFunction to realize the window data calculation logic of WindowFunction.
For example, A window has three data: [key A, window A, 0], [key A, window A, 4999], [key A, window A, 5000]
For fixed Windows, [key A, window A, 0], [key A, window A, 4999] will be calculated when the first watermark (watermark 5000) arrives. [Key A, Window A, 5000] will be counted when the second watermark (watermark 9999) arrives.
6. Add up (again)
Watermark is a global parameter used to manage message out-of-order. When watermark exceeds the endtime of a window, the window calculation is triggered. In general, after the window calculation is triggered, the window is destroyed and subsequent data is not computed.
With the addition of allowedLateness, computing will be different. The Windows allowedLateness attribute defaults to 0. If allowedLateness > 0, the window that triggered the calculation will be retained until a particular watermark arrives. This reservation is mainly for messages in the window.
What was this particular watermark? Watermark -allowedLateness>= Window endtime. When that particular watermark came up, the window disappeared, and subsequent messages belonging to that window were dropped. Between “watermark (= window endtime) “and “watermark (=endtime+allowedLateness)”, the corresponding window may count multiple times. The window’s endtime+allowedLateness <= watermark will be removed.
For example, if the endtime of the window is 5000, allowedLateness=0, then the window should be cleared if the watermark 5000 arrives. However, if allowedLateness = 1000, the window will be cleared only after water 6000(Endtime + allowedLateness) arrives.
Flink’s allowedLateness can be used for TumblingEventTimeWindow, SlidingEventTimeWindow, and EventTimeSessionWindows, which may cause the window to be triggered again, Equivalent to a modification of the window of the previous window (cumulative calculation or cumulative withdrawal calculation);
Note: In the case of trigger being the default EventTimeTrigger, allowedLateness will trigger the window calculation again, and the data that was previously triggered will be buffered, The window’s data and metadata information will not be deleted until the watermark exceeds the time of end-of-window + allowedLateness. A second calculation is the pattern of Conditionals in the DataFlow model.
Meanwhile, in the case of sessionWindow, when a late element arrives within the scope of allowedLateness, it may cause the merge of the window, so that the data of the previous window is accumulated in the new window. This is in the DataFlow model AccumulatingAndRetracting.
7. The spread of Watermark
There are often multiple stages in a pipeline for a production task, and watermarks generated at source are passed across multiple stages of the pipeline. Understanding how Watermark is delivered across stages of a pipeline provides a better understanding of the impact of Watermark on the pipeline as a whole and on pipeline result delays. We defined watermark on the boundary of each stage of pipeline as follows:
- Enter An input watermark: capture the data processing progress of upstream stages. For the source operator, input watermark is a special function that generates a watermark for incoming data. For non-source operators, input watermark is the smallest watermark generated by all shard/partition/instance in the upstream stage
- An output watermark (An output watermark) : captures the data progress of this stage, which essentially refers to the minimum value of all input watermark in this stage and the event time of all non-late event data in this stage. For example, in this stage, data is cached waiting to be aggregated, etc.
Operations within each stage are not linearly increasing. Conceptually, each stage operation can be divided into components, each of which affects the pipeline output watermark. The characteristics of each component depend on how it is implemented and what operators it contains. In theory, such operators cache data until a calculation is triggered. For example, some data is cached and stored in the state until an aggregate calculation is triggered and the result is written to the downstream stage.
Watermark can be the minimum of the following:
- Per-source watermark (per-source watermark) – Each stage that sends data.
- The watermark (per-external input watermark) – data source outside the pipeline for each external data source
- Per-state component watermark (PER-state component watermark) – Each state type to write to
- Per-output buffer watermark – Per receive stage
This accurate watermark can better describe the internal state of the system. It is easier to track the flow state of data in each buffer of the system, which helps to troubleshoot the problem of data congestion.
Watermark is broadcast from operator to operator, so what does an operator do when it receives watermark?
- Update operator time
- Traversing the timer queue triggers a callback
- Send watermark downstream
0x03.Flink program structure & Core Concepts
1. Program structure
Flink programs transform data sets like regular programs, and each program consists of the following parts:
- Get an execution environment
- Load/create initialization data
- Specify transformations to data
- Specifies the output of the calculation (printed or printed to a file)
- Trigger program execution
The core concept of flink flow computing is the process of transferring data from the input stream one by one to the Operator for chain processing and finally to the output stream. Each processing of data logically becomes an operator, and for the sake of localization efficiency, operators can also be processed together as a chain.
The following diagram illustrates how Flink views the user’s processing flow: User operations are abstracted as a series of operators. The operation that starts with source and ends with sink, and the operator in the middle, is called a transform, and several operations can be executed together.
Source ---> Transformation ----> Transformation ----> Sink
Copy the code
Here is a sample code that will be used for subsequent analysis.
DataStream<String> text = env.socketTextStream(hostname, port);
DataStream counts = text
.filter(new FilterClass())
.map(new LineSplitter())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator)
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(2)
counts.print()
System.out.println(env.getExecutionPlan());
Copy the code
2. Core classes/interfaces
When the user designs the program, it corresponds to the following core classes/interfaces:
- DataStream: describes a data flow of the same data type. The underlying data is implemented through Transformation, which provides apis for converting data on the flow.
- Transformation: Describes the operation that builds a DataStream, along with the parallelism, output data type, and a property that holds a concrete instance of the StreamOperator.
The above code logic, the data flow to do the following operations: filter, map, keyBy, assignTimestampsAndWatermarks, timeWindow, sum. Each transformation generates a new DataStream.
For example, timeWindow in the example code finally generates windowedStream. The Apply method executed on windowedStream generates a WindowOperator that initializes with the values of Trigger and allowedLateness. Then after transformation transform, is actually the implementation methods of the transform of the DataStream, finally created SingleOutputStreamOperator. SingleOutputStreamOperator this class name is a bit misleading, in fact it is a subclass of DataStream.
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
KeySelector<T, K> keySel = input.getKeySelector(); // Get the key from keyedStream
WindowOperator<K, T, Iterable<T>, R, W> operator;
operator = newWindowOperator<>(windowAssigner, ... .new InternalIterableWindowFunction<>(function),
trigger,
allowedLateness,
legacyWindowOpType);
return input.transform(opName, resultType, operator);// Execute the keyedstream. transaform operation based on operator name, the type of the window function, and the window operator
}
Copy the code
0x04.flink Executes graph model
The ExecutionGraph in Flink can be divided into four layers: StreamGraph > JobGraph > ExecutionGraph > physical ExecutionGraph.
- StreamGraph: A mapping of the user’s logic that represents the topology of the program and is the initial graph generated from code written by the user through the Stream API.
- JobGraph: The StreamGraph is optimized to give rise to JobGraph, the data structure submitted to JobManager. The main optimization is to combine multiple qualified node chains as one node, which can reduce the serialization/deserialization/transmission consumption of data flowing between nodes.
- ExecutionGraph: JobManager generates ExecutionGraph based on JobGraph. ExecutionGraph, a parallel version of JobGraph, is the core data structure of the scheduling layer.
- Physical ExecutionGraph: the graph is not a specific data structure after jobs are scheduled by the JobManager according to the ExecutionGraph and tasks are deployed on each TaskManager.
We’ll focus on StreamGraph here, and the relevant key data structures are:
- A StreamNode is a logical node that describes an operator and has all related properties such as concurrency, inbound and outbound edges, and so on.
- StreamEdge is a link edge that describes two StreamNode (operator) logic.
We can print the Execution Plan directly
System.out.println(env.getExecutionPlan());
Copy the code
Its internal call StreamExecutionEnvironment. GetExecutionPlan StreamGraph.
public String getExecutionPlan(a) {
return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON();
}
Copy the code
StreamGraph’s conversion flow is:
* Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink
Copy the code
Here’s a static schema of the sample code that prints StreamGraph results. You can see that the transformation in the code is translated into the following execution Unit (in the figure below, the execution sequence is top-down).
* + -- -- -- -- - >Data Source(ID = 1) [ Source Socket Stream ]
* | // env.sockettextStream (hostname, port) generates a Data Source* * | + -- -- -- -- -- >Operator(ID = 2) [ Filter ]
* |
* |
* +-----> Operator(ID = 3) [ Map ]
* |
* |
* +-----> Operator(ID = 4) [ Timestamps/Watermarks ]
* |
* |
* +-----> Operator(ID = 6) [ Window(SumAggregator) ]
* | // Multiple operators are constructed as an Operator Chain
* |
* |
* +-----> Data Sink(ID = 7) [ Sink : Print to Std. Out ]
* // counts. Print () adds a Data Sink at the end of the Data stream to accept statistics
Copy the code
In the sample code, Flink generates a StreamGraph as follows:
- First to deal with
Source
To generate theSource
theStreamNode
. - To deal with
Filter
To generate theFilter
theStreamNode
And generateStreamEdge
Connecting the upstreamSource
andFilter
. - To deal with
Map
To generate theMap
theStreamNode
And generateStreamEdge
Connecting the upstreamFilter
andMap
. - To deal with
assignTimestampsAndWatermarks
To generate theTimestamps/Watermarks
theStreamNode
And generateStreamEdge
Connecting the upstreamMap
andTimestamps/Watermarks
. - To deal with
keyBy/timeWindow/sum
To generate theWindow
theStreamNode
As well asOperator Chain
And generateStreamEdge
Connecting the upstreamTimestamps/Watermarks
andWindow
. - The final processing
Sink
To createSink
theStreamNode
And generateStreamEdge
With the upstreamWindow
Attached to it.
0x05. Execute module life cycle
The main core classes are:
-
Function: Users implement their own data processing logic by inheriting different subclasses of the interface. For example, the subclass SocketTextStreamFunction implements the logic of receiving data from specified hostname and port and forwarding strings.
-
Task: Flink is the basic unit of execution in Flink and represents a parallel subtask in a TaskManager that executes and executes encapsulated Flink operators and provides the following services: Consumption of input data, production IntermediateResultPartition [flink about intermediate result of abstract], and the JobManager interaction.
-
StreamTask: Is the base unit for local execution, which is deployed by TaskManagers. Contains multiple streamOperators that encapsulate the operator’s processing logic.
-
StreamOperator: Each Transformation on DataStream corresponds to a StreamOperator, which is a runtime implementation that determines how user-defined Funtion (UDF) is invoked.
-
StreamSource is a concrete implementation class of the StreamOperator interface whose constructor entry is a subclass of SourceFunction. Here is an instance of SocketTextStreamFunction.
Tasks are managed and scheduled directly by TaskManager, which in turn calls StreamTask(mainly subclasses of it), which encapsulates the processing logic of the StreamOperator. StreamSource is the operator used to start the entire stream. Let’s talk about dynamic logic.
In our example code, all the program logic runs in StreamTask(mainly subclasses of it), and filter/map corresponds to StreamOperator; AssignTimestampsAndWatermarks used to generate the Watermarks, passed to the downstream. KeyBy. TimeWindow (WindowOperator). KeyBy /timeWindow/sum is built into the OperatorChain. So let’s go through each of these concepts.
1. Task
Each Task is a set of Operators Chaining work together. The execution of a Flink Job can be seen as a DAG graph. Tasks are Vertex points on the DAG graph. The vertices are linked to each other through data transfer to form the Execution Graph of the entire Job.
Tasks are managed and scheduled directly by TaskManager, and Flink finally submits the Task through RPC methods, which are actually called into the TaskExecutor.submitTask method. This method creates the real Task and then calls task.starttAskThread (); Start task execution. The startTaskThread method, on the other hand, calls the task.run method by executing executingThread.start. Its core code is as follows:
* public class Task implements Runnable. *The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and runs it
*
* -- doRun() * * | + -- -- -- -- > fromNetworkEnvironmentIn the applicationBufferPool* | includingInputGateThe receivingpoolAs well astaskEach of theResultPartitionThe output of thepool* + -- -- -- -- >invokable = loadAndInstantiateInvokable (userCodeClassLoader, * | nameOfInvokableClass) through reflection to create * | load and instantiate the task's invokable code * | invokable is the operator object instances, such as OneInputStreamTask, SourceStreamTask etc * | OneInputStreamTask inherited StreamTask, Here actually calls the invoke () method is the StreamTask * + -- -- -- -- > invokable. Invoke () * | run the invokable, * | * | * OneInputStreamTask
extends StreamTask
>
,oneinputstreamoperator
,out> Copy the code
Where is this nameOfInvokableClass generated? This is already determined when the StreamGraph is generated, as shown in the streamgraph.addOperator method
if (operatorObject instanceof StoppableStreamSource) {
addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
} else if (operatorObject instanceof StreamSource) {
addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
} else {
addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
}
Copy the code
Here oneInputStreamTask.class is the generated StreamNode vertexClass. This value is going to keep passing
StreamGraph --> JobVertex.invokableClass --> ExecutionJobVertex.TaskInformation.invokableClassName --> Task
Copy the code
2. StreamTask
Is the base unit for local execution, which is deployed by TaskManagers and which calls StreamTask. StreamTask contains a headOperator and an operatorChain that encapsulates the operator processing logic. StreamTask is the execution process framework, and OperatorChain(StreamOperator) is responsible for the specific operator logic embedded in the execution process framework of StreamTask.
You can see the StreamTask lifecycle directly from the StreamTask annotations.
Where each operator’s open() method is called by the StreamTask openAllOperators() method. This method (referred to as openAllOperators) performs all operational initialization, such as registering timers using a timer service. A single task may be executing multiple operators, consuming the output of its precursor, in which case the open() method is called in the last operator, that is, the output of that operator is also the output of the task itself. This makes all of its downstream operators ready to receive its output when the first operator starts processing the input of the task.
The OperatorChain is created in the StreamTask invoke method. At execution time, if an operator cannot be chained, it has only a headOperator, and there are no other operators in the chain.
Note that the sequential operator in the task is open from last to first.
OneInputStreamTask, for example, the core of the Task executing code is OneInputStreamTask. The invoke method, it calls the StreamTask. The invoke method.
* The life cycle of the task(StreamTask) is set up as follows:
* {@code* - setInitialState - > provides the state of all operators in the chain * * | + -- -- -- -- > reinitializes the task of the state, and is particularly important in the following two cases: * |1.When the task is to recover from failure and restart from the last successful checkpoint point * |2.When recovering from a savepoint. * -- invoke() * | * +---->Create basic utils (config, etc) and load the chain of operators
* +----> operators.setup(a) // Create operatorChain and set it to the Output of headOperator* -- -- -- -- -- -- -- -- >openAllOperators(a)
* +----> task specific init(a)
* +----> initialize-operator-states(a)
* +----> open-operators(a) // Execute the open method on all operators in the operatorChain* + -- -- -- -- >run(a) // The runMailboxLoop() method will run until there is no more input data
* --------> mailboxProcessor.runMailboxLoop(a); * -- -- -- -- -- -- -- -- > StreamTask. ProcessInput () * -- -- -- -- -- -- -- -- > StreamTask. The inputProcessor. ProcessInput () * -- -- -- -- -- -- -- -- > indirect invocation Operator's processElement() and processWatermark() methods * +----> close-Operators ()// Execute the close method for all operators in the Operator chain
* +----> dispose-operators()
* +----> common cleanup
* +----> task specific cleanup(a)*}Copy the code
3. OneInputStreamTask
OneInputStreamTask is one of the representative implementation classes of StreamTask. The OneInputStreamTask does most of the execution in our sample code.
How is OneInputStreamTask generated?
* * - * generated when StreamNode StreamGraph. AddOperator () * * | + -- -- -- -- > invoke the (... OneInputStreamTask.class, operatorObject, operatorName); * | will OneInputStreamTask StreamTask set to StreamNode node properties such as * * * when the node structure of JobVertex initialization will also do a * * | + -- -- -- -- > jobVertex.setInvokableClass(streamNode.getJobVertexClass());Copy the code
Properties in jobVertex are retrieved later when TaskDeploymentDescriptor is instantiated.
What do OneInputStreamTask init() and run() do
* OneInputStreamTask
* class OneInputStreamTask<IN.OUT> extends StreamTask<OUT.OneInputStreamOperator<IN.OUT> > *{@code* - * * | + -- -- -- -- the init method > operator corresponding input serializer TypeSerializer * + -- -- -- -- > CheckpointedInputGate inputGate = createCheckpointedInputGate(); * Get input data InputGate[], InputGate is one of the core abstractions of Flink network transmission * it internally encapsulates message reception and memory management, From InputGate you can get data sent upstream * +----> inputProcessor =new StreamOneInputProcessor<>(input,output,operatorChain)
* | 1.StreamTask internal StreamInputProcessor, is used as a component of dealing with the Record, * | encapsulates the external IO logic inside the memory when the buffer to disk 】 vomit and logical * | Watermark 】 【 time alignment2.The output is StreamTaskNetworkOutput, Input is StreamTaskNetworkInput * | this input, The output they aggregate into StreamOneInputProcessor * + -- -- -- -- > headOperator. GetMetricGroup (). The gauge * + -- -- -- -- > GetEnvironment ().getMetricGroup().gauge * Set metrics * * * -- run (streamTask.run) * +----> StreamTask. RunMailboxLoop * | from StreamTask. RunMailboxLoop began, Below is the invocation of the relationship between layers * -- -- -- -- - > StreamTask. ProcessInput () * -- -- -- -- - > StreamTask. The inputProcessor. ProcessInput () * -- -- -- -- - > StreamOneInputProcessor.processInput * -----> input.emitNext(output) * -----> StreamTaskNetworkInput.emitNext() * |while(true) {read a record from the input source, the output is StreamTaskNetworkOutput} * -- -- -- -- - > StreamTaskNetworkInput. ProcessElement ()// Handle the record* | according to the different types of StreamElement * | do different treatmentif (recordOrMark.isRecord()) output.emitRecord()
* ------------> StreamTaskNetworkOutput.emitRecord()
* ----------------> operator.processElement(record)
* | if (recordOrMark.isWatermark()) statusWatermarkValve.inputWatermark()
* | if (recordOrMark.isLatencyMarker()) output.emitLatencyMarker()
* | if (recordOrMark.isStreamStatus()) statusWatermarkValve.inputStreamStatus()
Copy the code
4. OperatorChain
An operator in Flink represents a top-level API. In the case of Streaming, operations such as map/reduce/keyBy on DataStream generate an operator.
Operator Chain means that the Operators in the Job are linked and executed in a Task thread in the JobGraph generation stage according to certain policies (for example, single output Operators can be chained together). This reduces data transfer and thread switching, reduces system overhead, and improves resource utilization and Job performance.
Chained Operators actually create and setup one by one backwards from downstream. Chained operators are assumed to be StreamGroupedReduce – streamfilter-streamsink, and the actual initialization order is reversed: StreamSink – StreamFilter – StreamGroupedReduce.
* OperatorChain(
* StreamTask<OUT, OP> containingTask,
* RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate)
* {@code
* -- collect
* |
* +----> pushToOperator(StreamRecord<X> record)
* +---------> operator.processElement(castRecord);
* Operator is the chainedOperator, which is the chain of all operators other than headOperator.
* // The operator.processElement loop calls all operators in the operator chain up to the chain end.
* For example, the ChainingOutput Collect method of Operator A calls the processElement method of Operator A, and the ChainingOutput collect method of Operator B will be called, and so on. This enables chain-capable local processing and eventually sends the RecordWriterOutput over the network to the downstream node.
Copy the code
5. StreamOperator
StreamTask calls Operator, so we need to look at the life cycle of Operator.
The logical Operator Transformation eventually corresponds to the physical Operator, which corresponds to the StreamOperator concept.
StreamOperator is the root interface. All operators for Streaming inherit from the StreamOperator. The extended interfaces that inherit StreamOperator are OneInputStreamOperator, TwoInputStreamOperator. Implements StreamOperator abstract class AbstractStreamOperator and its subclasses AbstractStreamUdfOperator.
The elements that the operator processes as input may be one of the following: input Element, watermark, and checkpoint barriers. Each of them has a special unit to deal with. Element is handled by the processElement() method, watermark is handled by processWatermark(), checkpoint Barriers are handled by the asynchronously called snapshotState() method, This method triggers a checkpoint.
The processElement() method is also where the UDF logic is called, such as the map() method in MapFunction.
* AbstractUdfStreamOperator, which is the basic class for all operators that execute UDFs. * * / /initialization phase* // Initializeoperator-specificMethods, such asRuntimeContextandmetric collection
* OPERATOR: :setup
* UDF: :setRuntimeContext* / /setupThe call chain ofinvoke(StreamTask) - >constructor(OperatorChain) - >setup* / / callssetupWhen,StreamTaskHas been in variousTaskManager* // is given on the node to start withstatetheoperator
*
* OPERATOR: :initializeState* // Execute alloperator-specificInitialization of *OPERATOR: :open
* UDF: :open* * / /processing phase (called on every element/watermark)
* OPERATOR: :processElement
* UDF: :run// Given oneoperatorYou can have a user-defined function (UDF) *OPERATOR: :processWatermark* * / /checkpointing phase (called asynchronously on every checkpoint)
* OPERATOR: :snapshotState* * / /termination phase
* OPERATOR: :close
* UDF: :close
* OPERATOR: :dispose
Copy the code
OneInputStreamOperator interfaces with TwoInputStreamOperator. The two interfaces are very similar and essentially deal with the three elements StreamRecord, Watermark, and LatencyMarker that exist on a stream. One for single-stream input and one for dual-stream input.
6. StreamSource
StreamSource is used to open the whole flow of the operator (inherit AbstractUdfStreamOperator). Because the StreamSource has no input, it does not implement an interface for InputStreamOperator. In particular, the ChainingStrategy is initialized to HEAD.
In the StreamSource class, the Source is started at run time by the SourceStreamTask calling the Run method of the SourceFunction.
* class StreamSource<OUT.SRC extends SourceFunction<OUT> > *extends AbstractUdfStreamOperator<OUT.SRC> implements StreamOperator<OUT< span style = "max-width: 100%; clear: both; min-height: 1emrun() * * | + -- -- -- -- >latencyEmitter = newLatencyMarksEmitter * | used to produce the delay monitoring LatencyMarker * + -- -- -- -- >this. CTX = StreamSourceContexts. GetSourceContext * | according to time mode (EventTime/IngestionTime/ProcessingTime) to generate the corresponding SourceConext * | * +----> userfunction.run (CTX); * | call SourceFunction run method to start the source, data forwarding *public {
// After reading the data, pass it to the Collect method, which is responsible for passing the data to the appropriate location (e.g., publish it as a BR variable, pass it to the next operator, or send it over the network)
private transient SourceFunction.SourceContext<OUT> ctx;
private transient volatile boolean canceledOrStopped = false;
private transient volatile boolean hasSentMaxWatermark = false;
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector,
finalOperatorChain<? ,? > operatorChain) throws Exception { userFunction.run(ctx); }}Copy the code
7. StreamMap
StreamFilter, StreamMap, and StreamFlatMap call FilterFunction, MapFunction, and StreamFilter, respectively, in the implementation of processElement. The UDF of FlatMapFunction passes elements downstream. Here’s a StreamMap example:
public class StreamMap<IN.OUT>
extends AbstractUdfStreamOperator<OUT.MapFunction<IN.OUT>>
implements OneInputStreamOperator<IN.OUT> {
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }}Copy the code
8. WindowOperator
Flink through the water distributor (TimestampsAndPeriodicWatermarksOperator and TimestampsAndPunctuatedWatermarksOperator both operator) to the event flow into the water.
In our sample code, timeWindow() eventually corresponds to WindowStream, and the WindowOperator WindowOperator is the low-level implementation of the window mechanism. AssignTimestampsAndWatermarks corresponding TimestampsAndPeriodicWatermarksOperator operator, it convey the Watermark to the WindowOperator.
As the element flows into the WindowOperator in the Streaming Dataflow engine, it is divided into two dials, a normal event and a water line.
-
If it is a normal event, it is handled by calling the processElement method, where the window allocator is first used to allocate the window for the currently received element, followed by calling the trigger’s onElement method for element-by-element firing. For time-dependent triggers, event time or process-time timers are usually registered, which are stored in the WindowOperator process-time timer queue and waterline timer queue. If the result of firing is FIRE, the window is evaluated.
-
If it is a watermark (event-time scenario), the method processWatermark will be called, which will process the timers in the watermark timer queue. If the timestamp meets the criteria, it is processed using the trigger’s onEventTime method.
For the scenario of processing time, the WindowOperator implements itself as a trigger based onProcessingTime. The trigger method is used to consume the processing time timer queue and the onProcessingTime of the window trigger is called if the timer meets the conditions. Determine whether to calculate the window according to the trigger result.
* public class WindowOperator<K.IN.ACC.OUT.W extends Window>
* extends AbstractUdfStreamOperator<OUT.InternalWindowFunction<ACC.OUT.K.W> > *implements OneInputStreamOperator<IN.OUT>, Triggerable<K.W* * -- >processElement() * * | + -- -- -- -- >windowAssigner.assignWindows* | / / byWindowAssignerforelementAssign a serieswindows* + -- -- -- -- >windowState.add(element.getValue()) * | / / the currentelementjoinbuffer state* + -- -- -- -- >TriggerResult triggerResult = triggerContext.onElement(element)
* | // Trigger onElment to get a triggerResult
* +----> Trigger.OnMergeContext.onElement()
* +----> trigger.onElement(element.getValue(), element.getTimestamp(), window,...)
* +----> EventTimeTriggers.onElement()
* | // If the current window.maxTimestamp is smaller than CurrentWatermark, the CurrentWatermark is triggered
* | // Otherwise, register window.maxTimestamp with TimeService and wait for the trigger
* +----> contents = windowState.get(); emitWindowContents(actualWindow, contents)
* | // Do various things to triggerResult, if fire, to actually count elements in the window* * -- processWatermark () -- -- -- -- - > enter the base class AbstractStreamOperator eventually. ProcessWatermark * -- -- -- -- - > AbstractStreamOperator.processWatermark(watermark) * -----> timeServiceManager.advanceWatermark(mark); EmitWatermark * -----> output.emitwatermark (mark) InternalTimeServiceManager.advanceWatermarkCopy the code
0x06. Brief process for processing Watermark
Finally, a brief process for processing Watermark (OneInputStreamTask as an example)
* -- OneInputStreamTask.invoke() * | * +----> StreamTask.init * | The StreamTaskNetworkOutput/StreamTaskNetworkInput polymerization StreamOneInputProcessor * + -- -- -- -- > StreamTask. RunMailboxLoop * | from StreamTask runMailboxLoop began, Below is the invocation of the relationship between layers * -- -- -- -- - > StreamTask. ProcessInput () * -- -- -- -- - > StreamTask. The inputProcessor. ProcessInput () * -- -- -- -- - > StreamOneInputProcessor.processInput * -----> input.emitNext(output) * -----> StreamTaskNetworkInput.emitNext() * -----> StreamTaskNetworkInput. ProcessElement () deal with the ordinary Record * * here is -- StreamTaskNetworkInput. ProcessElement () * * | | The following are layers of call relationship * -- -- -- -- - > the output. The emitRecord (recordOrMark. AsRecord ()) * -- -- -- -- - > StreamTaskNetworkOutput. EmitRecord () * -- -- -- -- - > Operator. ProcessElement (record) * into the specific operator processElement processing, such as StreamFlatMap. ProcessElement * -- -- -- -- - > StreamFlatMap. ProcessElement (record) * -- -- -- -- - > userFunction. FlatMap () * - below is the Watermark * - processing StreamTaskNetworkInput. ProcessElement () * * | | the following are layers of call relationship * -- -- -- -- - > StatusWatermarkValve. InputWatermark () * -- -- -- -- - > StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels() * -----> output.emitWatermark() * -----> StreamTaskNetworkOutput.emitWatermark() * -----> operator.processWatermark(watermark) * -----> ProcessWatermark KeyedProcessOperator. ProcessWatermark (watermark) * specific operator handling, Such as WindowOperator/KeyedProcessOperator processWatermark * eventually enter the base class AbstractStreamOperator processWatermark * -- -- -- -- - > AbstractStreamOperator.processWatermark(watermark) * -----> timeServiceManager.advanceWatermark(mark); Output. The first step in dealing with watermark * emitWatermark sends the watermark (mark) the second step to downstream * -- -- -- -- - > InternalTimeServiceManager. AdvanceWatermark * -- -- -- -- -- > below to see the first step in processing watermark * -- -- -- -- - > InternalTimerServiceImpl. AdvanceWatermark * logic timer time less than the watermark should be triggered the callback. If the timer from eventTimeTimersQueue is smaller than the water mark passed in, then the window needs to fire. Note that watermarker does not have a key, so when a watermark comes, all the timers will be triggered, and the key of the timer is not always the same, so you have to set the keyContext, Otherwise it messed up * -- -- -- -- - > triggerTarget. OnEventTime (timer); * triggerTarget is a concrete operator object, When open by InternalTimeServiceManager. GetInternalTimerService passed to HeapInternalTimerService * -- -- -- -- - > KeyedProcessOperator. OnEeventTime () * calling user implementation keyedProcessFunction onTimer to do specific things. For Windows, call onEventTime or onProcessTime to send data from key and window states to windowFunction for calculation and downstream nodes * -----> invokeUserFunction(TimeDomain.PROCESSING_TIME, timer); * -----> userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); * send Watermark - set up DataStream timing, is TimestampsAndPeriodicWatermarksOperator * - with a chain StreamTaskNetworkInput.processElement() * -----> TimestampsAndPeriodicWatermarksOperator.processElement * Will call AssignerWithPeriodicWatermarks. ExtractTimestamp extract event time * and then update StreamRecord time * -- -- -- -- - > WindowOperator. ProcessElement * in windowAssigner. AssignWindows element of timestamp as the assign timeCopy the code
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
Flink – watermark
Understand event times and Watermarks
The flow handler of the Flink runtime generates flow diagrams
How to generate ExecutionGraph and physical ExecutionGraph
Flink Timer registration with Watermark trigger
Apache Flink source code parsing (four) Stream Operator
Window operator analysis for Flink flow processing
Flink Principles and Implementation: How to generate StreamGraph
Flink in Watermark timing generation source code analysis
Search for the source: through the source code to understand the implementation process of Flink core framework
The flow handler of the Flink runtime generates flow diagrams
Apache Flink Advanced (6) : Flink jobs perform deep parsing
Debug Windows and event time
Flink Best Practices (ii) Flink streaming computing system
Streaming System Chapter 3: Watermarks
Apache Flink source code parsing stream-source
Flink source code series – Flink in a simple data processing function of the implementation process
Data exchange mechanism between tasks in Flink
Data exchange between Flink tasks
[Flink Architecture (2) – Data Transfer in Flink]
Flink data abstraction and data exchange process
Talk about Flink Execution Plan Visualization
Flink Principles and Implementation: How to generate StreamGraph
Flink source Code series – The process of obtaining StreamGraph
Flink source code series – Flink in a simple data processing function of the implementation process
Flink source code interpretation Series 1 – Analysis of the execution of a simple Flink program
Flink Timer registration and Watermark trigger [reproduced from netease Cloud Music real-time computing Platform Classic Practice Zhihu column]
[Flink – process watermark] (cnblogs.com/fxjwind/p/7…).
Flink flow computing programming — A detailed introduction and consideration of allowedLateness in Flink
Description of “Spark-2.2.0” Structured Streaming – Watermarking operation
Flink window mechanism
Flink – window operator
Flink’s Window calculation, watermark, allowedLateness, trigger
Apache Flink source code parsing (four) Stream Operator
Flink-watermark was generated
Flink Introduction -Task Lifecycle
Flink principle and implementation: Operator Chain principle
Life cycle of Flink operator
Flink Principle 3 – Task, Operator Chain, and Slot
Flink -stream Task execution process
Flink Slot details and Job Execution Graph optimization