Senior day03_Flink API
Today’s goal
- The four cornerstones of Flink
- Flink Window Window operation
- Flink Time Time
- Flink Watermark mechanism
- Flink state management – Keyed state and operator state
The four cornerstones of Flink
- Checkpoint Distributed consistency, solves data loss and recovers data when faults occur
- State State, including Keyed State and Operator State. From the perspective of data structure, ValueState, ListState, MapState, BroadcastState
- Time, EventTime EventTime, Ingestion Ingestion Time, and Process processing Time
- Window Window, TimeWindow, Countwindow, sessionWindow
Windows operating
Classification of the Window
- time
- Use more scroll Windows and sliding Windows
- count
How to use
case
- demand
/** * Author itcast * Date 2021/5/7 9:13 * * number of signal lamp and number of vehicles passing the signal lamp * 9,3, 9,2, 9,7, 4,9, 2,6, 1,5, 2,3,5,7,5,4, demand 1: every 5 seconds, within the last 5 seconds, Number of cars passing traffic lights at each intersection -- time based rolling window * Demand 2: Every 5 seconds, the last 10 seconds, the number of cars passing traffic lights at each intersection -- time based sliding window */Copy the code
-
Analysis of the
-
code
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 9:13 * * number of signal lamp and number of vehicles passing the signal lamp * 9,3, 9,2, 9,7, 4,9, 2,6, 1,5, 2,3,5,7,5,4, demand 1: every 5 seconds, within the last 5 seconds, Number of cars passing traffic lights at each intersection -- time based rolling window * Demand 2: Every 5 seconds, the last 10 seconds, the number of cars passing traffic lights at each intersection -- time based sliding window */ public class WindowDemo { public static void main(String[] args) throws Exception { //1. Create a flow environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2. Obtain the data source DataStreamSource<String> source = env.socketTextStream("node1".9999); //3. The conversion operation is based on key window statistics DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); }});// Statistics scroll window DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) // The processing time is used // Statistics are collected every 5 seconds, within the latest 5 seconds .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) .sum("count"); // Statistics slide window DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) // The processing time is used // Statistics are collected every 5 seconds, within the latest 5 seconds .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .sum("count"); //4 result1.print(); //5. Execution flow environment env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;// Signal id private Integer count;// The number of cars passing the signal}}Copy the code
-
Demand 2 – Countwindow
Requirement 1: Count the number of cars passing each intersection in the last 5 messages. Count the same key every 5 times — a rolling window based on the number
Requirement 2: Count the number of cars passing each intersection in the last 5 messages. Count the same key every 3 times — a sliding window based on the number
-
code
/** * Author itcast * Date 2021/5/7 9:13 * * number of the signal lamp and number of vehicles passing the signal lamp * 9,3, 9,2, 9,7, 4,9, 2,6, 1,5, 2,3, 5,7, 5,4 Requirement 1: count the number of cars passing through each intersection in the last 5 messages, and make statistics every 5 occurrences of the same key -- rolling window based on quantity Requirement 2: Count the number of cars passing through each intersection in the last 5 messages, and make statistics every 3 occurrences of the same key -- sliding window based on quantity */ public class WindowDemo02 { public static void main(String[] args) throws Exception { //1. Create a flow environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2. Obtain the data source DataStreamSource<String> source = env.socketTextStream("node1".9999); //3. The conversion operation is based on key window statistics DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); }});// Statistics scroll counting window DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) // The processing time is used // Every 5 items of data are counted .countWindow(5) .sum("count"); // Statistics sliding counting window DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) // The processing time is used // Statistics are collected every 5 seconds, within the latest 5 seconds .countWindow(10.5) .sum("count"); //4 //result.printToErr(); result1.print(); //5. Execution flow environment env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;// Signal id private Integer count;// The number of cars passing the signal}}Copy the code
-
If there is no data in this window, the calculation is not performed. Set the session timeout period to 10s. If no data arrives within 10s, the calculation in the previous window is triggered.
-
Example: Set the session timeout period to 10s. If no data arrives within 10s, the calculation of the previous window is triggered
-
code
/** * Author itcast * Date 2021/5/7 9:13 * * the number of the signal lamp and the number of cars passing the signal lamp * 9,3, 9,2, 9,7, 4,9,2, 9,2,6, 1,5, 2,3, 5,7, 5,4 sets the session timeout time to 10s. If no data arrives within 10s, the calculation of the previous window will be triggered */ public class WindowDemo03 { public static void main(String[] args) throws Exception { //1. Create a flow environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2. Obtain the data source DataStreamSource<String> source = env.socketTextStream("node1".9999); //3. The conversion operation is based on key window statistics DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); }});// Statistics session window DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) // The processing time is used // Every 5 items of data are counted .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .sum("count"); //4 //result.printToErr(); result.print(); //5. Execution flow environment env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;// Signal id private Integer count;// The number of cars passing the signal}}Copy the code
Time Time
- The importance of EventTime
- Prevent network jitter from causing data out of order and data statistics loss
- Window: Start time – End time
Watermark time
-
Watermark Mechanism
- Watermark is a time stamp
- watermark = eventTime – maxDelayTime
-
Trigger calculation watermak >= end time
Watermark case
-
demand
Have order data in the format of: (Order ID, user ID, timestamp/event time, order amount)
Request every 5s, calculate within 5 seconds, each user’s order total amount
Watermark was added to address some of the data latency and data out-of-order issues.
The basic version:
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */ public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source // Simulate real-time order data (data is delayed and out of order) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); // Simulate data delay and out of order! long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); }}@Override public void cancel(a) { flag = false; }});// The maximum allowed delay for adding watermarks is 3 seconds //orderDS.printToErr(); // Allocate watermark mechanism SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy // Specify the maximum delay time .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) Long extractTimestamp(T element, long recordTimestamp); .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())) // Count the purchase amount of each user .keyBy(t -> t.getUserId()) // Specify a window to collect data within 5 seconds every 5 seconds .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); sum.print(); // env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; privateLong eventTime; }}Copy the code
Extended version:
package cn.itcast.sz22.day03; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */ public class WatermarkDemo02 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source // Simulate real-time order data (data is delayed and out of order) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); // Simulate data delay and out of order! long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); }}@Override public void cancel(a) { flag = false; }}); DataStream<Order> WatermarkDS = orderDS .assignTimestampsAndWatermarks(new WatermarkStrategy<Order>() { @Override public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Order>() { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); private int userId = 0; private long eventTime = 0L; private final long outOfOrdernessMillis = 3000; private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; @Override public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) { userId = event.userId; eventTime = event.eventTime; maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { //Watermark = Current maximum event time - maximum allowed delay time or out-of-order time timestamp Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1); System.out.println("key:" + userId + ", system time: + df.format(System.currentTimeMillis()) + ", event time :" + df.format(eventTime) + ", watermark time :"+ df.format(watermark.getTimestamp())); output.emitWatermark(watermark); }}; } }.withTimestampAssigner((event, timestamp) -> event.getEventTime()) );// The code is already added to the Watermark! Now you can do the window calculation // Every 5s, calculate the total amount of each user's order within 5 seconds (time-based scrolling window) /* DataStream
result = WatermarkDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); * / // Use the above code to calculate the business // You can use the following code to output data in more detail, such as event time of data in each window when the output window triggers,Watermark time DataStream<String> result = WatermarkDS .keyBy(Order::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Apply the apply function to the data in the window //WindowFunction<IN, OUT, KEY, W extends Window> .apply(new WindowFunction<Order, String, Integer, TimeWindow>() { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); @Override public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception { // Prepare a collection to store the event time of the data belonging to the window List<String> eventTimeList = new ArrayList<>(); for (Order order : input) { Long eventTime = order.eventTime; eventTimeList.add(df.format(eventTime)); } String outStr = String.format("Key :%s, window start end :[%s~%s), event time belonging to this window :%s", key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList); out.collect(outStr); }});// The maximum allowed delay for adding watermarks is 3 seconds //orderDS.printToErr(); result.printToErr(); env.execute(); // Allocate watermark mechanism } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; privateLong eventTime; }}Copy the code
Allowed lateness
-
case
Have order data in the format of: (Order ID, user ID, timestamp/event time, order amount)
Request every 5s, calculate within 5 seconds, each user’s order total amount
Watermark was added to address some of the data latency and data out-of-order issues.
package cn.itcast.sz22.day03;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/** * Author itcast * Date 2021/5/7 14:51 (Order ID, user ID, timestamp/event time, order amount) * Calculate the total amount of each user's order within 5 seconds every 5 seconds * and add Watermark to solve some data delay and data out of order. * /
public class WatermarkDemo03 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
// Simulate real-time order data (data is delayed and out of order)
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
private boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (flag) {
String orderId = UUID.randomUUID().toString();
int userId = random.nextInt(3);
int money = random.nextInt(100);
// Simulate data delay and out of order!
long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;
ctx.collect(new Order(orderId, userId, money, eventTime));
TimeUnit.SECONDS.sleep(1); }}@Override
public void cancel(a) {
flag = false; }}); OutputTag<Order> oot =new OutputTag<Order>("maxDelayOrder", TypeInformation.of(Order.class));
EventTime The default value is maxDelay 3 seconds
SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
.keyBy(t -> t.getUserId())
// Window set every 5s, calculate within 5 seconds
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// The instantiation side output stream is mainly used for data that is 3 seconds later than the maximum delay
.allowedLateness(Time.seconds(3))
.sideOutputLateData(oot)
/ / statistics
.sum("money");
result.print("Normal data");
result.getSideOutput(oot).print("Seriously late data.");
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
privateLong eventTime; }}Copy the code
State Management
-
Stateful management scenario
[Img-znyxlaEB-1624261970363] (assets/image-20210507151242102. PNG)
-
Whether or not it is hosted by Flink falls into two categories
-
managed state
Flink manages the state itself
Data structure: valueState ListState mapState
-
raw state
Users and programmers need to maintain their own state
Data structure: ListState
-
-
Whether state management is performed based on keys
-
keyed state
Data structure: valueState ListState mapState
reducingState
-
operator state
Data structure: ListState
-
Keyed state
- Example – use ValueState in KeyState to get the maximum value in the data (maxBy is used directly). Use the ValueState to customize, and enter Tuple2<String/The word/, Long/The length of theOutput Tuple3 < / a > String /The word/, Long/The length of the/, Long/Historical maximum/ > type
- The map mapping
- Define valueState to count the current historical maximum value
- Output Tuple3 < String /The word/, Long/The length of the/, Long/Historical maximum/>
package cn.itcast.sz22.day03;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** * Author itcast * Date 2021/5/7 15:58 * Use ValueState in KeyState to get the maximum value in the data
public class StateDemo01 {
public static void main(String[] args) throws Exception {
Env Sets the concurrency to 1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2
DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
Tuple2.of("Beijing".1L),
Tuple2.of("Shanghai".2L),
Tuple2.of("Beijing".6L),
Tuple2.of("Shanghai".8L),
Tuple2.of("Beijing".3L),
Tuple2.of("Shanghai".4L));//3.Transformation
// Use ValueState in KeyState to get the maximum value of the stream data.
// Implementation 1: Use maxBy directly -- use this method in development
DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0)
.maxBy(1);
//min will select the smallest field
//minBy calculates the smallest field and the corresponding other fields
// Max will only find the largest field
//maxBy calculates the largest field and other corresponding fields
//实现方式2:通过managed state输入的state
/ / Tuple2 < String / * * / words, Long length / * * / > output Tuple3 < String / * * / words, Long length / * * /, Long historical maximum / * * / >
SingleOutputStreamOperator<Tuple3<String, Long, Long>> maxCount = tupleDS.keyBy(t -> t.f0)
.map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
// Save the state of the current maximum value in memory
private transient ValueState<Long> currentMaxValue;
@Override
public void open(Configuration parameters) throws Exception {
// A description of the data structure stored in memory
ValueStateDescriptor desc = new ValueStateDescriptor("maxCount", TypeInformation.of(Long.class));
currentMaxValue = getRuntimeContext().getState(desc);
}
@Override
public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
String city = value.f0;
Long currentValue = value.f1;
if (currentMaxValue.value() == null || currentMaxValue.value() < currentValue) {
currentMaxValue.update(currentValue);
return Tuple3.of(city, currentValue, currentMaxValue.value());
} else {
returnTuple3.of(city, currentValue, currentMaxValue.value()); }}});Tuple3
//-1. Define the state of the value type to store the maximum value
//3.2. Override the open method of RichMapFunction
//-2. Define the state descriptor
//-3. Get the state value in memory from the current context
//3.3. Override the map method
//-4. Get the historical maximum value of state and the current maximum value of the element and compare them
//-5. Update the status if the current value is large or the history value is empty; Returns the Tuple3 ancestor result
//4.Sink prints output
// result1.printToErr();
maxCount.print();
Execute Execution environmentenv.execute(); }}Copy the code
operate state
- Most scenarios simply read the source, using the data structure ListState
- Example – use ListState to store offset to simulate Kafka offset maintenance
package cn.itcast.sz22.day03;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
/** * Author itcast * Date 2021/5/7 16:59 * use ListState to store offset */
public class OperatorStateDemo01 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Use the following code to set the Checkpoint interval, disk path, and restart policy if the code encounters an exception
env.enableCheckpointing(1000);// Perform Checkpoint every 1s
// Where do I save the global state? hdfs://node1:8020/checkpoint/
env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
// Check whether checkpoint is saved when the current task is canceledenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION);// The current checkpoint mechanism EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Fixed delay restart policy: When the program is abnormal, restart the program twice. Each delay is 3 seconds. If the program restarts twice, exit
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3.3000));
//2.Source
DataStreamSource<String> sourceData = env.addSource(new MyMoniKafkaSource());
//3.Transformation
//4.Sink
sourceData.print();
//5.execute
env.execute();
}
/ / 1. Create the class MyMoniKafkaSource RichparallelSourceFunction and implement CheckpointedFunction inheritance
public static class MyMoniKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{
//1.1. Define ListState
to store offsetState, offset, and flag
ListState<Long> offsetState;
Long offset = 0L;
boolean flag = true;
//1.2. Override the initializeState method
// // Create a List state descriptor
// // initializes the state through the context according to the state descriptor
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Long> desc = new ListStateDescriptor<>("offsetState",
TypeInformation.of(Long.class));
offsetState = context.getOperatorStateStore()
.getListState(desc);
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
// Get and iterate over the values in ListState, if any, assigned to offset
Iterable<Long> offsets = offsetState.get();
if(offsets.iterator().hasNext()){
offset = offsets.iterator().next();
}
// //while(flag)
while(flag){
// Get the Index of the current subtask
offset += 1;
// CTX collect ID and offset (" partition :"+ ID +" consume to offset position :"+offset
int id = getRuntimeContext().getIndexOfThisSubtask();
ctx.collect("Partition."+id+"Consume to offset :"+offset);
Thread.sleep(2000);
//) and output
// // sleep for 2 seconds and save state to checkpoint
// // Simulated exceptions are thrown every 5 seconds to see if offset can be restored later
if(offset%5= =0){
System.out.println("Bug in current program");
throw new Exception("Bug in current program"); }}}@Override
public void cancel(a) {
flag = false;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Clear offset stored in memory offsetState
offsetState.clear();
// Add offset to stateoffsetState.add(offset); }}}Copy the code
“+offset int id = getRuntimeContext().getIndexOfThisSubTask (); Ctx.collect (” partition :”+id+” consume to offset :”+offset); Thread.sleep(2000); // // sleep for 2 seconds, save state to checkpoint // // If (offset%5==0){system.out.println (” error “); Throw new Exception(” current program has a bug”); }}}
@Override public void cancel() { flag = false; } @override public void snapshotState(FunctionSnapshotContext Context) throws Exception {// Clears the offset stored in memory offsetState offsetState.clear(); // Add offset to state offsetstate. add(offset); }}Copy the code
}
Copy the code