Welcome to my GitHub

Github.com/zq2599/blog…

Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;

Welcome to my GitHub

Here classification and summary of xinchen all original (including supporting source code) : github.com/zq2599/blog…

Flink processing function practice series links

  1. Learn more about the state operation of ProcessFunction (Flink-1.10);
  2. ProcessFunction;
  3. KeyedProcessFunction classes;
  4. ProcessAllWindowFunction(window processing);
  5. CoProcessFunction(dual-stream processing);

Confusion about ProcessFunction status

In the process of learning Flink’s ProcessFunction, the official document mentioned more than once the elements that only apply to keyed stream, as shown in the red box below:

Keyed Stream keyed Stream keyed Stream Keyed Stream Keyed Stream

  1. Why can only keyed stream elements read and write states?
  2. How does the state for each key operate?

Flink’s “state”

Let’s review the Flink” state “:

  1. Keyed state and operator state

The keyed Stream element has a key that matches ProcessFunction’s state, while the other Steam elements have no key that matches ProcessFunction’s state. The other State is Operator State, which is bound to an instance of a multi-parallelism Operator. For example, the current Operator consumes the latest offset of a kafka partition, while ProcessFunction handles the stream element. Operator State is not involved:

The official demo

In order to study ProcessFunction went to see the official demo, the address is: ci.apache.org/projects/fl… A brief description of the demo’s functions:

  1. The data source continuously generates words, each of which corresponds to an instance of Tuple2

    ;
    ,string>
  2. The data source is converted to a KeyedStream by the keyBy method, and the key is the F0 field of the Tuple2 instance;
  3. CountWithTimeoutFunction, a subclass of KeyedProcessFunction, is used to handle each element of the KeyedStream. The logic is to maintain a state for each key. The state contains the number of occurrences of that key and the time when it was last seen.
  4. If that key is not present for a minute, KeyedProcessFunction sends the element downstream;

The above is the function of the official demo. Originally, I wanted to deepen my understanding through the demo, but after reading the result, I did not understand, but more confused. The following picture is my confusion about the Demo code:

The processElement method is used to retrieve the status of aaa, update the status of aaa, and save the status of aaa. State.value () returns the aaa state. The value method does not take aaa as an input. If the f0 field of the next entry value is equal to BBB, does state.value() return the state of BBB? 7. Same confusion about the status update code state.update(current); 8. Then comes a new puzzle: Does the member variable state keep changing? Each time the processElement is executed, it becomes the state instance of the key.

First, reflect on why there are such doubts

  1. If a HashMap is used to obtain a value, the key is specified when the get method is called. If a HashMap is used to set a value, the key is specified when the put method is set. Therefore, it is not used to see that the state.value() method does not use a key
  2. The first thing to do to eliminate this mismatch is to remind yourself that processElement runs within the framework, and much of the data has been prepared by the framework before;
  3. The ProcessFunction is the lowest level abstraction (at the bottom of the image below). The ProcessFunction is the lowest level abstraction (at the bottom of the image below). The ProcessFunction is the lowest level abstraction (at the bottom).

Tracing the source code

  1. Let’s start with the stack of a breakpoint before executing the processElement method in the demo above. You can see that the source is the run method of the thread on which the KeyedProcessFunction operator performs the task:

Streamtask.run (); streamTask.run (); streamTask.run ();. 3. The following figure, StreamOneInputProcessor processInput removed KeyedStream an element, call processElement method, and the element as the reference, coupled with a picture you can see: When you subclass KeyedProcessFunction, each element of the KeyedStream will be passed as an input parameter when you call your overwritten processElement method. This should be taken into account when developing both ProcessFunction and KeyedProcessFunction:4. Next to the place where the most critical, below in the red box streamOperator. SetKeyContextElement1 (record) will answer questions in front of me, be sure to go in to see clearly, (behind the yellow line of code, you ought to have guessed, So that’s calling the processElement method in the demo.)5. The following figure, AbstractStreamOperator setKeyContextElement gives the answer: for each element KeyedStream, will work out the key here, again call setCurrentKey save the key:6. Expand setCurrentKey as shown in the following figure, and find that the saving of key is related to the current storage policy (StateBackend). Here is the default policy HeapKeyedStateBackend:7. Eventually, the key obtained from the current element is stored somewhere in the keyContext object of StateBackend. The implementation of StateBackend depends on the Flink Settings. I’m saving it in the currentKey variable of InternalKeyContextImpl instance:8. At this point in the code, you should be able to guess the answer to my previous doubts: In StateBackend, the keyContext is used to retrieve the saved key, and the state of the key can be checked like a HashMap. State contains a reference to the keyContext object of StateBackend, so access to the saved key is not a problem:The value() method is simple and straightforward, and the key saved by keyContext is used as an input parameter to retrieve the corresponding state:The stateMap implementation is an instance of CopyOnWriteStateMap:12. At this point in the code, there is only one more verification: The state.update(current) method that updates the state takes the key from StateBackend’s keyContext as its own key and updates the current as its value to stateMap. StateTable. Put = stateTable. Put = stateTable.The statetable. put method uses the key saved by the keyContext as its own key:15. The statemap. put method is called to save data to CopyOnWriteStateMap instance:16. Thanks to the standard, clear design and implementation of Flink code, as well as the powerful debug function of IDEA, the whole reading and analysis process is very smooth. The gains from this process will be effective in the future in-depth study of DataStreamAPI.

Finally, a simple flow chart is drawn according to the above analysis process, hoping to help you speed up your understanding:

Welcome to pay attention to the public number: programmer Xin Chen

Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…

Github.com/zq2599/blog…