Senior day03_Flink API

Today’s goal

  • The four cornerstones of Flink
  • Flink Window Window operation
  • Flink Time Time
  • Flink Watermark mechanism
  • Flink state management – Keyed state and operator state

The four cornerstones of Flink

  • Checkpoint Distributed consistency, solves data loss and recovers data when faults occur
  • State State, including Keyed State and Operator State. From the perspective of data structure, ValueState, ListState, MapState, BroadcastState
  • Time, EventTime EventTime, Ingestion Ingestion Time, and Process processing Time
  • Window Window, TimeWindow, Countwindow, sessionWindow

Windows operating

Classification of the Window

  • time
    • Use more scroll Windows and sliding Windows
  • count

How to use

case

  • demand
/** * Author itcast * Date 2021/5/7 9:13 * * number of signal lamp and number of vehicles passing the signal lamp * 9,3, 9,2, 9,7, 4,9, 2,6, 1,5, 2,3,5,7,5,4, demand 1: every 5 seconds, within the last 5 seconds, Number of cars passing traffic lights at each intersection -- time based rolling window * Demand 2: Every 5 seconds, the last 10 seconds, the number of cars passing traffic lights at each intersection -- time based sliding window */Copy the code
  • Analysis of the

  • code

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import java.util.concurrent.TimeUnit;
    
    /** * Author itcast * Date 2021/5/7 9:13 * * number of signal lamp and number of vehicles passing the signal lamp * 9,3, 9,2, 9,7, 4,9, 2,6, 1,5, 2,3,5,7,5,4, demand 1: every 5 seconds, within the last 5 seconds, Number of cars passing traffic lights at each intersection -- time based rolling window * Demand 2: Every 5 seconds, the last 10 seconds, the number of cars passing traffic lights at each intersection -- time based sliding window */
    public class WindowDemo {
        public static void main(String[] args) throws Exception {
            //1. Create a flow environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.setParallelism(1);
            //2. Obtain the data source
            DataStreamSource<String> source = env.socketTextStream("node1".9999);
            //3. The conversion operation is based on key window statistics
            DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new CartInfo(split[0], Integer.parseInt(split[1])); }});// Statistics scroll window
            DataStream<CartInfo> result = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    // The processing time is used
                    // Statistics are collected every 5 seconds, within the latest 5 seconds
                    .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
                    .sum("count");
            // Statistics slide window
            DataStream<CartInfo> result1 = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    // The processing time is used
                    // Statistics are collected every 5 seconds, within the latest 5 seconds
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                    .sum("count");
    
            //4
            result1.print();
            //5. Execution flow environment
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;// Signal id
            private Integer count;// The number of cars passing the signal}}Copy the code
  • Demand 2 – Countwindow

    Requirement 1: Count the number of cars passing each intersection in the last 5 messages. Count the same key every 5 times — a rolling window based on the number

    Requirement 2: Count the number of cars passing each intersection in the last 5 messages. Count the same key every 3 times — a sliding window based on the number

  • code

    /** * Author itcast * Date 2021/5/7 9:13 * * number of the signal lamp and number of vehicles passing the signal lamp * 9,3, 9,2, 9,7, 4,9, 2,6, 1,5, 2,3, 5,7, 5,4 Requirement 1: count the number of cars passing through each intersection in the last 5 messages, and make statistics every 5 occurrences of the same key -- rolling window based on quantity Requirement 2: Count the number of cars passing through each intersection in the last 5 messages, and make statistics every 3 occurrences of the same key -- sliding window based on quantity */
    public class WindowDemo02 {
        public static void main(String[] args) throws Exception {
            //1. Create a flow environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.setParallelism(1);
            //2. Obtain the data source
            DataStreamSource<String> source = env.socketTextStream("node1".9999);
            //3. The conversion operation is based on key window statistics
            DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new CartInfo(split[0], Integer.parseInt(split[1])); }});// Statistics scroll counting window
            DataStream<CartInfo> result = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    // The processing time is used
                    // Every 5 items of data are counted
                    .countWindow(5)
                    .sum("count");
            // Statistics sliding counting window
            DataStream<CartInfo> result1 = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    // The processing time is used
                    // Statistics are collected every 5 seconds, within the latest 5 seconds
                    .countWindow(10.5)
                    .sum("count");
    
            //4
            //result.printToErr();
            result1.print();
            //5. Execution flow environment
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;// Signal id
            private Integer count;// The number of cars passing the signal}}Copy the code
  • If there is no data in this window, the calculation is not performed. Set the session timeout period to 10s. If no data arrives within 10s, the calculation in the previous window is triggered.

  • Example: Set the session timeout period to 10s. If no data arrives within 10s, the calculation of the previous window is triggered

  • code

    /** * Author itcast * Date 2021/5/7 9:13 * * the number of the signal lamp and the number of cars passing the signal lamp * 9,3, 9,2, 9,7, 4,9,2, 9,2,6, 1,5, 2,3, 5,7, 5,4 sets the session timeout time to 10s. If no data arrives within 10s, the calculation of the previous window will be triggered */
    public class WindowDemo03 {
        public static void main(String[] args) throws Exception {
            //1. Create a flow environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.setParallelism(1);
            //2. Obtain the data source
            DataStreamSource<String> source = env.socketTextStream("node1".9999);
            //3. The conversion operation is based on key window statistics
            DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new CartInfo(split[0], Integer.parseInt(split[1])); }});// Statistics session window
            DataStream<CartInfo> result = cartInfoDS
                    .keyBy(t -> t.getSensorId())
                    // The processing time is used
                    // Every 5 items of data are counted
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                    .sum("count");
    
            //4
            //result.printToErr();
            result.print();
            //5. Execution flow environment
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;// Signal id
            private Integer count;// The number of cars passing the signal}}Copy the code

Time Time

  • The importance of EventTime
    • Prevent network jitter from causing data out of order and data statistics loss
  • Window: Start time – End time

Watermark time

  • Watermark Mechanism

    • Watermark is a time stamp
    • watermark = eventTime – maxDelayTime
  • Trigger calculation watermak >= end time

Watermark case

  • demand

    Have order data in the format of: (Order ID, user ID, timestamp/event time, order amount)

    Request every 5s, calculate within 5 seconds, each user’s order total amount

    Watermark was added to address some of the data latency and data out-of-order issues.

    The basic version:

    package cn.itcast.sz22.day03;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    
    import java.time.Duration;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */
    public class WatermarkDemo01 {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //
            env.setParallelism(1);
            //2.Source
            // Simulate real-time order data (data is delayed and out of order)
            DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
                private boolean flag = true;
    
                @Override
                public void run(SourceContext<Order> ctx) throws Exception {
                    Random random = new Random();
                    while (flag) {
                        String orderId = UUID.randomUUID().toString();
                        int userId = random.nextInt(3);
                        int money = random.nextInt(100);
                        // Simulate data delay and out of order!
                        long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                        ctx.collect(new Order(orderId, userId, money, eventTime));
                        TimeUnit.SECONDS.sleep(1); }}@Override
                public void cancel(a) {
                    flag = false; }});// The maximum allowed delay for adding watermarks is 3 seconds
            //orderDS.printToErr();
            // Allocate watermark mechanism
            SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
                    // Specify the maximum delay time
                    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                    Long extractTimestamp(T element, long recordTimestamp);
                    .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
                    // Count the purchase amount of each user
                    .keyBy(t -> t.getUserId())
                    // Specify a window to collect data within 5 seconds every 5 seconds
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .sum("money");
            sum.print();
    
            //
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order {
            private String orderId;
            private Integer userId;
            private Integer money;
            privateLong eventTime; }}Copy the code

    Extended version:

    package cn.itcast.sz22.day03;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.commons.lang3.time.FastDateFormat;
    import org.apache.flink.api.common.eventtime.*;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */
    public class WatermarkDemo02 {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //
            env.setParallelism(1);
            //2.Source
            // Simulate real-time order data (data is delayed and out of order)
            DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
                private boolean flag = true;
    
                @Override
                public void run(SourceContext<Order> ctx) throws Exception {
                    Random random = new Random();
                    while (flag) {
                        String orderId = UUID.randomUUID().toString();
                        int userId = random.nextInt(3);
                        int money = random.nextInt(100);
                        // Simulate data delay and out of order!
                        long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;
                        ctx.collect(new Order(orderId, userId, money, eventTime));
                        TimeUnit.SECONDS.sleep(1); }}@Override
                public void cancel(a) {
                    flag = false; }}); DataStream<Order> WatermarkDS = orderDS .assignTimestampsAndWatermarks(new WatermarkStrategy<Order>() {
                                @Override
                                public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                                    return new WatermarkGenerator<Order>() {
                                        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
                                        private int userId = 0;
                                        private long eventTime = 0L;
                                        private final long outOfOrdernessMillis = 3000;
                                        private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    
                                        @Override
                                        public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                                            userId = event.userId;
                                            eventTime = event.eventTime;
                                            maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
                                        }
    
                                        @Override
                                        public void onPeriodicEmit(WatermarkOutput output) {
                                            //Watermark = Current maximum event time - maximum allowed delay time or out-of-order time timestamp
                                            Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
                                            System.out.println("key:" + userId + ", system time: + df.format(System.currentTimeMillis()) + ", event time :" + df.format(eventTime) + ", watermark time :"+ df.format(watermark.getTimestamp())); output.emitWatermark(watermark); }}; } }.withTimestampAssigner((event, timestamp) -> event.getEventTime()) );// The code is already added to the Watermark! Now you can do the window calculation
            // Every 5s, calculate the total amount of each user's order within 5 seconds (time-based scrolling window)
           /* DataStream
            
              result = WatermarkDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); * /
            
    
            // Use the above code to calculate the business
            // You can use the following code to output data in more detail, such as event time of data in each window when the output window triggers,Watermark time
            DataStream<String> result = WatermarkDS
                    .keyBy(Order::getUserId)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    // Apply the apply function to the data in the window
                    //WindowFunction<IN, OUT, KEY, W extends Window>
                    .apply(new WindowFunction<Order, String, Integer, TimeWindow>() {
                        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
                        @Override
                        public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {
                            // Prepare a collection to store the event time of the data belonging to the window
                            List<String> eventTimeList = new ArrayList<>();
                            for (Order order : input) {
                                Long eventTime = order.eventTime;
                                eventTimeList.add(df.format(eventTime));
                            }
                            String outStr = String.format("Key :%s, window start end :[%s~%s), event time belonging to this window :%s", key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList); out.collect(outStr); }});// The maximum allowed delay for adding watermarks is 3 seconds
            //orderDS.printToErr();
            result.printToErr();
            env.execute();
            // Allocate watermark mechanism
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order {
                private String orderId;
                private Integer userId;
                private Integer money;
                privateLong eventTime; }}Copy the code

Allowed lateness

  • case

    Have order data in the format of: (Order ID, user ID, timestamp/event time, order amount)

    Request every 5s, calculate within 5 seconds, each user’s order total amount

    Watermark was added to address some of the data latency and data out-of-order issues.

package cn.itcast.sz22.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/** * Author itcast * Date 2021/5/7 14:51 (Order ID, user ID, timestamp/event time, order amount) * Calculate the total amount of each user's order within 5 seconds every 5 seconds * and add Watermark to solve some data delay and data out of order. * /
public class WatermarkDemo03 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        // Simulate real-time order data (data is delayed and out of order)
        DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    // Simulate data delay and out of order!
                    long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    TimeUnit.SECONDS.sleep(1); }}@Override
            public void cancel(a) {
                flag = false; }}); OutputTag<Order> oot =new OutputTag<Order>("maxDelayOrder", TypeInformation.of(Order.class));
        EventTime The default value is maxDelay 3 seconds
        SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
                .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
                .keyBy(t -> t.getUserId())
                // Window set every 5s, calculate within 5 seconds
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // The instantiation side output stream is mainly used for data that is 3 seconds later than the maximum delay
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(oot)
                / / statistics
                .sum("money");

        result.print("Normal data");
        result.getSideOutput(oot).print("Seriously late data.");
        env.execute();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        privateLong eventTime; }}Copy the code

State Management

  • Stateful management scenario

[Img-znyxlaEB-1624261970363] (assets/image-20210507151242102. PNG)

  • Whether or not it is hosted by Flink falls into two categories

    1. managed state

      Flink manages the state itself

      Data structure: valueState ListState mapState

    2. raw state

      Users and programmers need to maintain their own state

      Data structure: ListState

  • Whether state management is performed based on keys

    1. keyed state

      Data structure: valueState ListState mapState

      reducingState

    2. operator state

      Data structure: ListState

Keyed state

  • Example – use ValueState in KeyState to get the maximum value in the data (maxBy is used directly). Use the ValueState to customize, and enter Tuple2<String/The word/, Long/The length of theOutput Tuple3 < / a > String /The word/, Long/The length of the/, Long/Historical maximum/ > type
    1. The map mapping
    2. Define valueState to count the current historical maximum value
    3. Output Tuple3 < String /The word/, Long/The length of the/, Long/Historical maximum/>
package cn.itcast.sz22.day03;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/** * Author itcast * Date 2021/5/7 15:58 * Use ValueState in KeyState to get the maximum value in the data
public class StateDemo01 {
    public static void main(String[] args) throws Exception {
        Env Sets the concurrency to 1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2
        DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("Beijing".1L),
                Tuple2.of("Shanghai".2L),
                Tuple2.of("Beijing".6L),
                Tuple2.of("Shanghai".8L),
                Tuple2.of("Beijing".3L),
                Tuple2.of("Shanghai".4L));//3.Transformation
        // Use ValueState in KeyState to get the maximum value of the stream data.
        // Implementation 1: Use maxBy directly -- use this method in development
        DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0)
                .maxBy(1);

        //min will select the smallest field
        //minBy calculates the smallest field and the corresponding other fields
        // Max will only find the largest field
        //maxBy calculates the largest field and other corresponding fields
        //实现方式2:通过managed state输入的state
        / / Tuple2 < String / * * / words, Long length / * * / > output Tuple3 < String / * * / words, Long length / * * /, Long historical maximum / * * / >
        SingleOutputStreamOperator<Tuple3<String, Long, Long>> maxCount = tupleDS.keyBy(t -> t.f0)
                .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                    // Save the state of the current maximum value in memory
                    private transient ValueState<Long> currentMaxValue;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // A description of the data structure stored in memory
                        ValueStateDescriptor desc = new ValueStateDescriptor("maxCount", TypeInformation.of(Long.class));
                        currentMaxValue = getRuntimeContext().getState(desc);
                    }

                    @Override
                    public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
                        String city = value.f0;
                        Long currentValue = value.f1;
                        if (currentMaxValue.value() == null || currentMaxValue.value() < currentValue) {
                            currentMaxValue.update(currentValue);
                            return Tuple3.of(city, currentValue, currentMaxValue.value());
                        } else {
                            returnTuple3.of(city, currentValue, currentMaxValue.value()); }}});Tuple3
      
        //-1. Define the state of the value type to store the maximum value
        //3.2. Override the open method of RichMapFunction
        //-2. Define the state descriptor
        //-3. Get the state value in memory from the current context
        //3.3. Override the map method
        //-4. Get the historical maximum value of state and the current maximum value of the element and compare them
        //-5. Update the status if the current value is large or the history value is empty; Returns the Tuple3 ancestor result
        //4.Sink prints output
// result1.printToErr();
        maxCount.print();
        Execute Execution environmentenv.execute(); }}Copy the code

operate state

  • Most scenarios simply read the source, using the data structure ListState
  • Example – use ListState to store offset to simulate Kafka offset maintenance
package cn.itcast.sz22.day03;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

/** * Author itcast * Date 2021/5/7 16:59 * use ListState to store offset */
public class OperatorStateDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Use the following code to set the Checkpoint interval, disk path, and restart policy if the code encounters an exception
        env.enableCheckpointing(1000);// Perform Checkpoint every 1s
        // Where do I save the global state? hdfs://node1:8020/checkpoint/
        env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        // Check whether checkpoint is saved when the current task is canceledenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION);// The current checkpoint mechanism EXACTLY_ONCE
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        Fixed delay restart policy: When the program is abnormal, restart the program twice. Each delay is 3 seconds. If the program restarts twice, exit
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3.3000));

        //2.Source
        DataStreamSource<String> sourceData = env.addSource(new MyMoniKafkaSource());

        //3.Transformation
        //4.Sink
        sourceData.print();

        //5.execute
        env.execute();
    }
    / / 1. Create the class MyMoniKafkaSource RichparallelSourceFunction and implement CheckpointedFunction inheritance
    public static class MyMoniKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{
        //1.1. Define ListState
      
        to store offsetState, offset, and flag
      
        ListState<Long> offsetState;
        Long offset = 0L;
        boolean flag = true;
        //1.2. Override the initializeState method
        // // Create a List state descriptor
        // // initializes the state through the context according to the state descriptor
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Long> desc = new ListStateDescriptor<>("offsetState",
                    TypeInformation.of(Long.class));
            offsetState = context.getOperatorStateStore()
                    .getListState(desc);
        }


        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            // Get and iterate over the values in ListState, if any, assigned to offset
            Iterable<Long> offsets = offsetState.get();
            if(offsets.iterator().hasNext()){
                offset = offsets.iterator().next();
            }
            //	//while(flag)
            while(flag){
                // Get the Index of the current subtask
                offset += 1;
                // CTX collect ID and offset (" partition :"+ ID +" consume to offset position :"+offset
                int id = getRuntimeContext().getIndexOfThisSubtask();
                ctx.collect("Partition."+id+"Consume to offset :"+offset);
                Thread.sleep(2000);
                //) and output
                // // sleep for 2 seconds and save state to checkpoint
                // // Simulated exceptions are thrown every 5 seconds to see if offset can be restored later
                if(offset%5= =0){
                    System.out.println("Bug in current program");
                    throw new Exception("Bug in current program"); }}}@Override
        public void cancel(a) {
            flag  = false;
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // Clear offset stored in memory offsetState
            offsetState.clear();
            // Add offset to stateoffsetState.add(offset); }}}Copy the code

“+offset int id = getRuntimeContext().getIndexOfThisSubTask (); Ctx.collect (” partition :”+id+” consume to offset :”+offset); Thread.sleep(2000); // // sleep for 2 seconds, save state to checkpoint // // If (offset%5==0){system.out.println (” error “); Throw new Exception(” current program has a bug”); }}}

@Override public void cancel() { flag = false; } @override public void snapshotState(FunctionSnapshotContext Context) throws Exception {// Clears the offset stored in memory offsetState offsetState.clear(); // Add offset to state offsetstate. add(offset); }}Copy the code

}

Copy the code