Welcome to my GitHub

Here classification and summary of xinchen all original (including supporting source code) : github.com/zq2599/blog…

This paper gives an overview of

  • This article is the last part of the CoProcessFunction Trilogy. The main content is to use timer and side output in CoProcessFunction to enhance the function of the last part.
  • Stream 1 receives AAA and stores it in state until stream 2 receives AAA, and then adds the values of the two AAA’s and outputs them downstream.
  • There is a problem with the above functions: if no. 2 stream receives AAA all the time, there is no OUTPUT of AAA in the downstream, which is equivalent to the AAA entering the no. 1 stream has been lost to the sea;
  • Today’s actual combat is to fix the above problems: after aaa appears in a flow, within 10 seconds if it appears in another flow, as before, the values add up and output to the downstream. If it does not appear in another flow within 10 seconds, it will flow side output and then clean up all the states.

Refer to the article

  1. Understanding states: “In-depth understanding of State Operations for ProcessFunction (Flink-1.10)”
  2. Understanding timers: “Understanding the Timer logic of ProcessFunction”

Carding process

  • In order to correct the logic of the code, let’s sort out the normal and abnormal flow programs;
  • Here is the normal flow: AAA appears in stream 1, and within 10 seconds in stream 2, and then adds up and flows downstream:

  • Flow 1 received AAA at 16:14:01, but flow 2 did not receive AAA. 10 seconds later, that is 16:14:11, the timer was triggered, and it knew from state 1 that flow 1 received AAA 10 seconds ago, so it sent the data to side 1 for output:

  • Then code to implement the above functions;

Download the source code

If you don’t want to write code, the entire series of source codes can be downloaded at GitHub, with the following address and link information (github.com/zq2599/blog…

The name of the link note
Project home page Github.com/zq2599/blog… The project’s home page on GitHub
Git repository address (HTTPS) Github.com/zq2599/blog… The project source warehouse address, HTTPS protocol
Git repository address (SSH) [email protected]:zq2599/blog_demos.git The project source warehouse address, SSH protocol

The Git project has multiple folders. This chapter will be used in the FlinkStudy folder, as shown in the red box below:

A subclass of CoProcessFunction

  1. In the previous two exercises, subclasses of CoProcessFunction were written as anonymous classes, as shown in the red box below:

  1. In this article, CoProcessFunction subclasses can use external member variables of the class, so can’t use anonymous classes, new CoProcessFunction subclasses ExecuteWithTimeoutCoProcessFunction. Java, later will show a few key points:
package com.bolingcavalry.coprocessfunction;

import com.bolingcavalry.Utils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** * Implements dual-stream business logic function class */
public class ExecuteWithTimeoutCoProcessFunction extends CoProcessFunction<Tuple2<String.Integer>, Tuple2<String.Integer>, Tuple2<String.Integer>> {

    private static final Logger logger = LoggerFactory.getLogger(ExecuteWithTimeoutCoProcessFunction.class);

    /** * wait time */
    private static final long WAIT_TIME = 10000L;

    public ExecuteWithTimeoutCoProcessFunction(OutputTag<String> source1SideOutput, OutputTag<String> source2SideOutput) {
        super(a);this.source1SideOutput = source1SideOutput;
        this.source2SideOutput = source2SideOutput;
    }

    private OutputTag<String> source1SideOutput;

    private OutputTag<String> source2SideOutput;

    // The state of a key stored in processElement1
    private ValueState<Integer> state1;

    // The state of a key stored in processElement2
    private ValueState<Integer> state2;

    // If a timer is created, save the key of the timer in the state
    private ValueState<Long> timerState;

    (KeyedProcessFunction OnTimerContext has an API, but CoProcessFunction OnTimerContext does not)
    private ValueState<String> currentKeyState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // Initialization state
        state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("myState1", Integer.class));
        state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("myState2", Integer.class));
        timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timerState", Long.class));
        currentKeyState = getRuntimeContext().getState(new ValueStateDescriptor<>("currentKeyState", String.class));
    }

    /** * all states are cleared */
    private void clearAllState(a) {
        state1.clear();
        state2.clear();
        currentKeyState.clear();
        timerState.clear();
    }

    @Override
    public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
        logger.info("ProcessElement1: Processing element 1: {}", value);

        String key = value.f0;

        Integer value2 = state2.value();

        // value2 is empty, which means processElement2 has not yet processed or the key,
        // Save value1
        if(null==value2) {
            logger.info("ProcessElement1: Stream 2 has not received [{}], save the value [{}] received by stream 1", key, value.f1);
            state1.update(value.f1);

            currentKeyState.update(key);

            // Start the 10-second timer, enter after 10 seconds
            long timerKey = ctx.timestamp() + WAIT_TIME;
            ctx.timerService().registerProcessingTimeTimer(timerKey);
            // Save the timer key
            timerState.update(timerKey);
            logger.info("ProcessElement1: Creates timer [{}] and waits for stream 2 to receive data", Utils.time(timerKey));
        } else {
            logger.info("Stream processElement1:2 received [{}] with the value [{}], now add the two values and print", key, value2);

            // Outputs a new element to the downstream node
            out.collect(new Tuple2<>(key, value.f1 + value2));

            // Delete the timer (this timer should be created by processElement2)
            long timerKey = timerState.value();
            logger.info("New element of processElement1: [{}] has been output downstream, remove timer [{}]", key, Utils.time(timerKey)); ctx.timerService().deleteProcessingTimeTimer(timerKey); clearAllState(); }}@Override
    public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
        logger.info("ProcessElement2: Processing element 2: {}", value);

        String key = value.f0;

        Integer value1 = state1.value();

        // Value1 is empty, which means processElement1 has not yet processed or the key,
        // Save value2
        if(null==value1) {
            logger.info("ProcessElement2: Stream 1 has not received [{}], save the value [{}] received by stream 2", key, value.f1);
            state2.update(value.f1);

            currentKeyState.update(key);

            // Start the 10-second timer, enter after 10 seconds
            long timerKey = ctx.timestamp() + WAIT_TIME;
            ctx.timerService().registerProcessingTimeTimer(timerKey);
            // Save the timer key
            timerState.update(timerKey);
            logger.info("ProcessElement2: Creates timer [{}] and waits for stream 1 to receive data", Utils.time(timerKey));
        } else {
            logger.info("Stream processElement2:1 received [{}] with the value [{}], now add the two values and print", key, value1);

            // Outputs a new element to the downstream node
            out.collect(new Tuple2<>(key, value.f1 + value1));

            // Delete the timer (this timer should be created by processElement1)
            long timerKey = timerState.value();
            logger.info("New element of processElement2: [{}] has been output downstream, remove timer [{}]", key, Utils.time(timerKey)); ctx.timerService().deleteProcessingTimeTimer(timerKey); clearAllState(); }}@Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
        super.onTimer(timestamp, ctx, out);

        String key = currentKeyState.value();

        // The timer is triggered, which means the key has only been used in one instance
        logger.info("Timer [{}] for [{}] was triggered", key, Utils.time(timestamp));

        Integer value1 = state1.value();
        Integer value2 = state2.value();

        if(null! =value1) { logger.info("Only stream 1 received [{}] with a value of [{}]", key, value1);
            / / the output side
            ctx.output(source1SideOutput, "source1 side, key [" + key+ "], value [" + value1 + "]");
        }

        if(null! =value2) { logger.info("Only stream 2 received [{}] with a value of [{}]", key, value2);
            / / the output side
            ctx.output(source2SideOutput, "source2 side, key [" + key+ "], value [" + value2 + "]"); } clearAllState(); }}Copy the code
  1. One of the key points: the new state timerState is used to save the key of the timer;
  2. Key point 2: The current key is not available in the onTimer of CoProcessFunction (KeyedProcessFunction can, its OnTimerContext class provides the API), so the new state currentKeyState, So we know the current key in the onTimer;
  3. Key point 3: in processElement1, when processing AAA, if the AAA has not been received by stream 2, store the state and start the 10-second timer;
  4. Key point 4: When processElement2 processes AAA, it finds that stream 1 has received AAA, adds it up and outputs it downstream, deletes the timer created in processElement1, and all aaa related states are cleaned up.
  5. Key point 5: If AAA occurs in both streams within 10 seconds, it must flow downstream and the timer will be deleted. Therefore, once the onTimer is executed, it means that AAA has only occurred in one stream and 10 seconds have passed. At this time, the flow-side output operation can be performed in onTimer.
  6. Above is shuangliu processing logic and code, then write AbstractCoProcessFunctionExecutor subclass.

Business class AddTwoSourceValueWithTimeout execution

  1. Responsible for performing the function, is an abstract class AbstractCoProcessFunctionExecutor subclass, as follows, later will show a few key points:
package com.bolingcavalry.coprocessfunction;

import com.bolingcavalry.Utils;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/ * * *@author will
 * @email [email protected]
 * @dateThe 2020-11-11 09:48 *@descriptionWhen a key appears in one flow, * will wait for it to appear in the other flow within a limited time. If it does not appear within the waiting time, */ will be output in bypass mode
public class AddTwoSourceValueWithTimeout extends AbstractCoProcessFunctionExecutor {

    private static final Logger logger = LoggerFactory.getLogger(AddTwoSourceValueWithTimeout.class);

    If no AAA is received from source 2 for more than 10 seconds after the AAA flows into source1, the AAA from source1 flows into source1SideOutput
    final OutputTag<String> source1SideOutput = new OutputTag<String>("source1-sideoutput") {};If no AAA is received from source 1 for more than 10 seconds after the AAA flows into source2, the AAA from source2 flows into source2SideOutput
    final OutputTag<String> source2SideOutput = new OutputTag<String>("source2-sideoutput") {};/** * overrides the parent method, leaving the parent logic unchanged and adding a timestamp allocator to the element@param port
     * @return* /
    @Override
    protected KeyedStream<Tuple2<String, Integer>, Tuple> buildStreamFromSocket(StreamExecutionEnvironment env, int port) {
        return env
                // Listen on the port
                .socketTextStream("localhost", port)
                // The resulting string "aaa,3" is converted into an instance of Tuple2, f0="aaa", f1=3
                .map(new WordCountMap())
                // Set the timestamp allocator to use the current time as the timestamp
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        long timestamp = System.currentTimeMillis();
                        logger.info("Add timestamp, value: {}, timestamp: {}", element, Utils.time(timestamp));
                        // Use the current system time as the timestamp
                        return timestamp;
                    }

                    @Override
                    public Watermark getCurrentWatermark(a) {
                        Watermark is not required in this case, null is returned
                        return null; }})// Partition words as keys
                .keyBy(0);
    }

    @Override
    protected CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> getCoProcessFunctionInstance() {
        return new ExecuteWithTimeoutCoProcessFunction(source1SideOutput, source2SideOutput);
    }

    @Override
    protected void doSideOutput(SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream) {
        // Both side outputs are printed directly
        mainDataStream.getSideOutput(source1SideOutput).print();
        mainDataStream.getSideOutput(source2SideOutput).print();
    }

    public static void main(String[] args) throws Exception {
        newAddTwoSourceValueWithTimeout().execute(); }}Copy the code
  1. 1. Add or subtract member variables source1SideOutput and source2SideOutput for side output;
  2. Key # 2: Override the parent class’s buildStreamFromSocket method and add a timestamp allocator so that every element has a timestamp;
  3. Key # 3: Override the parent’s doSideOutput method, which prints the side output;
  4. So that’s all of your code, so let’s verify;

Validation (operations that do not time out)

  1. Open ports 9998 and 9999 of the machine respectively. I have a MacBook here, and execute NC-L 9998 and NC-L 9999
  2. Start the Flink application, if you like me is Mac, run directly AddTwoSourceValueWithTimeout. The main method can (if it is a Windows PC, I didn’t tried it, but may make jar online deployment);
  3. Flink console: aaa,1 = aaa,1 = aaa,1 = aaa; flink console: aaa,1 = aaa,1 = aaa; flink console: aaa,1 = aaa;
18:18:10, 472 INFO AddTwoSourceValueWithTimeout - add a timestamp, value: (aaa, 1), the time stamp: The 2020-11-12 06:18:10 18:18:10, 550 INFO ExecuteWithTimeoutCoProcessFunction - processElement1: processing element 1: 18:18:10 (aaa, 1), 550 INFO ExecuteWithTimeoutCoProcessFunction - processElement1: 2 flow has not been received (aaa), the no. 1 the value stream received [1] 18:18:10 preservation, 553 INFO ExecuteWithTimeoutCoProcessFunction - processElement1: Create timer [2020-11-12 06:18:20] and wait for stream 2 to receive dataCopy the code
  1. As soon as possible in the console listening to port 9999 enter aaa,2, flink log as shown below, visible after adding output to the downstream, and timer is also deleted:
18:18:15, 813 INFO AddTwoSourceValueWithTimeout - add a timestamp, value: (aaa, 2), a timestamp: The 2020-11-12 06:18:15 18:18:15, 887 INFO ExecuteWithTimeoutCoProcessFunction - processElement2: processing elements 2: (aaa, 2) 18:18:15, 887 INFO ExecuteWithTimeoutCoProcessFunction - processElement2: 1 flow received (aaa), value is [1], now after the two values in the output (aaa, 3) 18:18:15, 888 INFO ExecuteWithTimeoutCoProcessFunction - processElement2: [AAA] new element has been output downstream, delete timer [2020-11-12 06:18:20]Copy the code

Validation (timed out operations)

  1. After the normal interview process, try again to see if the timeout process meets expectations.
  2. Type AAA,1 in the console listening on port 9998, and wait ten seconds. The flink console output is as follows: the timer is triggered and AAA flows to the side output of stream 1:
18:23:37, 393 INFO AddTwoSourceValueWithTimeout - add a timestamp, value: (aaa, 1), the time stamp: The 2020-11-12 06:23:37 18:23:37, 417 INFO ExecuteWithTimeoutCoProcessFunction - processElement1: processing element 1: 18:23:37 (aaa, 1), 417 INFO ExecuteWithTimeoutCoProcessFunction - processElement1: 2 flow has not been received (aaa), the no. 1 the value stream received [1] 18:23:37 preservation, 417 INFO ExecuteWithTimeoutCoProcessFunction - processElement1: Create timer [2020-11-12 06:23:47] Wait 2 flow 18:23:47 receiving data, 398 INFO ExecuteWithTimeoutCoProcessFunction 06:23:47 [2020-11-12] - (aaa) of the timer is triggered 18:23:47, 399 INFO ExecuteWithTimeoutCoProcessFunction - only 1 flow received (aaa), value of [1] source1 side, key (aaa), the value [1]Copy the code
  • At this point, the CoProcessFunction combat trilogy has been completed, I hope these three times can give you some reference, help you to master and understand CoProcessFunction faster;

You are not alone, Xinchen original accompany all the way

  1. Java series
  2. Spring series
  3. The Docker series
  4. Kubernetes series
  5. Database + middleware series
  6. The conversation series

Welcome to pay attention to the public number: programmer Xin Chen

Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…

Github.com/zq2599/blog…