Welcome to my GitHub

Github.com/zq2599/blog…

Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;

Flink processing function practice series links

  1. Learn more about the state operation of ProcessFunction (Flink-1.10);
  2. ProcessFunction;
  3. KeyedProcessFunction classes;
  4. ProcessAllWindowFunction(window processing);
  5. CoProcessFunction(dual-stream processing);

This paper gives an overview of

This article is the third part of the Flink processing function combat series. The ProcessFunction class has learned about the simplest ProcessFunction class, KeyedProcessFunction, and some of the features that come with it.

About KeyedProcessFunction

KeyedProcessFunction is not directly related to ProcessFunction by comparing the class diagram:KeyedProcessFunction is used to process the data set of KeyedStream. KeyedProcessFunction has more features than the ProcessFunction class. State handling and timer functions are unique to KeyedProcessFunction:With this introduction, let’s learn by example.

Version information

  1. Development environment operating system: MacBook Pro 13 inch, macOS Catalina 10.15.3
  2. Development tool: IDEA ULTIMATE 2018.3
  3. The JDK: 1.8.0 comes with _211
  4. Maven: 3.6.0
  5. Flink: 1.9.2

Download the source code

If you don’t want to write code, the entire series of source codes can be downloaded at GitHub, with the following address and link information (github.com/zq2599/blog…

The name of the link note
Project home page Github.com/zq2599/blog… The project’s home page on GitHub
Git repository address (HTTPS) Github.com/zq2599/blog… The project source warehouse address, HTTPS protocol
Git repository address (SSH) [email protected]:zq2599/blog_demos.git The project source warehouse address, SSH protocol

The Git project has multiple folders. This chapter will be used in the FlinkStudy folder, as shown in the red box below:

Introduction of actual combat

KeyedProcessFunction KeyedProcessFunction KeyedProcessFunction

  1. Listen on port 9999, get string;
  2. Separate each string with a space and turn it into a Tuple2 instance, with f0 being the delimited word and f1 equal to 1;
  3. The above Tuple2 instance is partitioned with the f0 field to get a KeyedStream.
  4. KeyedSteam goes into custom KeyedProcessFunction processing;
  5. Custom KeyedProcessFunction is used to record the last occurrence time of each word, and then set up a ten-second timer. If the word does not appear again after ten seconds, it will send the word and its total occurrence times to the downstream operator.

coding

  1. Continue to use the project FlinkStudy created in the article “Flink Processing Functions In Action ii: ProcessFunction Class”;
  2. Create bean class CountWithTimestamp with three fields, set to public for convenience:
package com.bolingcavalry.keyedprocessfunction;

public class CountWithTimestamp {
    public String key;

    public long count;

    public long lastModified;
}
Copy the code
  1. Create FlatMapFunction implementation class Splitter, the function is to split the string to generate multiple instances of Tuple2, f0 is the separated word, f1 is equal to 1:
package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction<String.Tuple2<String.Integer>> {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {

        if(StringUtils.isNullOrWhitespaceOnly(s)) {
            System.out.println("invalid line");
            return;
        }

        for(String word : s.split("")) {
            collector.collect(new Tuple2<String, Integer>(word, 1)); }}}Copy the code
  1. Finally, there is the body of the logical functionality: ProcessTime.java, which includes the custom KeyedProcessFunction subclass and the main method of the program entry. The code is listed below and the key parts are explained:
package com.bolingcavalry.keyedprocessfunction;

import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;


/ * * *@author will
 * @email [email protected]
 * @dateThe 2020-05-17 shine forth *@descriptionExperience the KeyedProcessFunction class (time type is processing time) */
public class ProcessTime {

    /** * a subclass of KeyedProcessFunction that records the latest occurrence time of each word to backend and creates a timer. When the timer is fired, it checks whether the last occurrence time of the word has been 10 seconds. If so, it sends information to the downstream operator */
    static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple.Tuple2<String.Integer>, Tuple2<String.Long>> {

        // Customize the state
        private ValueState<CountWithTimestamp> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            // Initialization state, name is myState
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }

        @Override
        public void processElement( Tuple2
       
         value, Context ctx, Collector
        
         > out)
        
       ,> throws Exception {

            // Get the current word
            Tuple currentKey = ctx.getCurrentKey();

            // Get the myState of the current word from Backend
            CountWithTimestamp current = state.value();

            // If myState has never been assigned a value, initialize it here
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }

            // The number of words increases by one
            current.count++;

            // Takes the timestamp of the current element as the last occurrence of the word
            current.lastModified = ctx.timestamp();

            // Save to backend, including the number of occurrences of the word and the last occurrence time
            state.update(current);

            // Create a timer for the current word, trigger after 10 seconds
            long timer = current.lastModified + 10000;

            ctx.timerService().registerProcessingTimeTimer(timer);

            // Prints all information to check data correctness
            System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n\n",
                    currentKey.getField(0),
                    current.count,
                    current.lastModified,
                    time(current.lastModified),
                    timer,
                    time(timer)));

        }

        /** * The method to execute after the timer is triggered@paramTimestamp This timestamp represents the trigger time of the timer *@param ctx
         * @param out
         * @throws Exception
         */
        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // Get the current word
            Tuple currentKey = ctx.getCurrentKey();

            // Get the myState of the word
            CountWithTimestamp result = state.value();

            // Indicates whether the current element has not appeared for 10 consecutive seconds
            boolean isTimeout = false;

            // timestamp is the trigger time of the timer. If it is equal to the last update time +10 seconds, it means that the word has been received within 10 seconds.
            // The element that does not appear for ten seconds is sent to the downstream operator
            if (timestamp == result.lastModified + 10000) {
                / / send
                out.collect(new Tuple2<String, Long>(result.key, result.count));

                isTimeout = true;
            }

            // Print data to check if it meets expectations
            System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",
                    currentKey.getField(0), result.count, result.lastModified, time(result.lastModified), timestamp, time(timestamp), String.valueOf(isTimeout))); }}public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parallelism 1
        env.setParallelism(1);

        // Processing time
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // Listen on port 9999 to read the string
        DataStream<String> socketDataStream = env.socketTextStream("localhost".9999);

        // All entered words that do not reappear for more than 10 seconds can be retrieved by CountWithTimeoutFunction
        DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
                // Split the received string with Spaces to get multiple words
                .flatMap(new Splitter())
                // Set the timestamp allocator to use the current time as the timestamp
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        // Use the current system time as the timestamp
                        return System.currentTimeMillis();
                    }

                    @Override
                    public Watermark getCurrentWatermark(a) {
                        Watermark is not required in this case, null is returned
                        return null; }})// Partition words as keys
                .keyBy(0)
                // Use custom KeyedProcessFunction to process the partitioned data by word
                .process(new CountWithTimeoutFunction());

        // All entered words are printed here if they do not appear again after 10 seconds
        timeOutWord.print();

        env.execute("ProcessFunction demo : KeyedProcessFunction");
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(newDate(timeStamp)); }}Copy the code

The above code has a few highlights:

  1. Through setting timestamp, assignTimestampsAndWatermarks getCurrentWatermark returns null, because without watermark;
  2. In the processElement method, state.value() gets the state of the current word, state.update(current) sets the state of the current word, See “In Depth on State Operations for ProcessFunction (Flink-1.10)” for more details on this feature.
  3. RegisterProcessingTimeTimer method to set the timer triggering time, pay attention to the timer is based on the processTime here, and the official demo eventTime is different;
  4. After the timer is triggered, the onTimer method is executed, which contains all the information about the timer, especially the input parameter TIMESTAMP, which is the trigger time of the timer originally set.

validation

  1. Run the command nC-L 9999 on the console to send a string from the console to port 9999 on the machine.
  2. Execute ProcessTime’s main method directly on IDEA and start listening on port 9999.
  3. Enter AAA on the previous console and press Enter. After ten seconds, the IEDA console outputs the following message, as expected:

4. Continue to enter AAA and press Enter, twice in a row, with an interval of no more than 10 seconds. The result is shown in the following figure, which shows that each Tuple2 element has a timer. Therefore, the comparison operation in the timer of input AAA for the second time finds that it has not reached 10 seconds since the latest (i.e. the third) occurrence of AAA, so the second element will not be emitted to the downstream operator:5. All the timeout information received by the downstream operator will be printed out, as shown in the red box below, only the records with quantities equal to 1 and 3 will be printed. When it is equal to 2, because AAA is entered again within 10 seconds, no timeout is received, and it will not be printed in the downstream:At this point, the KeyedProcessFunction handler function learning is complete, its state read and write and timer operation is very practical ability, I hope this article can provide you with reference;

You are not alone, Xinchen original accompany all the way

  1. Java series
  2. Spring series
  3. The Docker series
  4. Kubernetes series
  5. Database + middleware series
  6. The conversation series

Welcome to pay attention to the public number: programmer Xin Chen

Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…

Github.com/zq2599/blog…