This is the fifth day of my participation in Gwen Challenge
Flink State State
Flink is a stateful streaming computing engine, so it saves the intermediate result (state) in the TaskManager heap by default. However, when a task dies, the corresponding state of the task is emptied, resulting in data loss. Therefore, it is impossible to guarantee the correctness of the result, even if the correct result is expected. All the data had to be recalculated, which was inefficient. To ensure at-leastonce and exact-once, data state needs to be persisted to a more secure storage medium. Flink provides storage media such as in-heap memory, out-of-heap memory, HDFS, and RocksDB
Let’s take a look at the states provided by Flink.
There are two types of states in Flink
- Keyed State
Based on the State on the KeyedStream, which is bound to a specific Key, each Key on the KeyedStream corresponds to a State, and each Operator can initiate multiple Thread processing, but data with the same Key can only be processed by the same Thread. Therefore, a Keyedstate can exist in only one Thread, and a Thread has multiple Keyedstates
- Non-keyed State (Operator State)
The Operator State is independent of the Key and is bound to the Operator. The entire Operator corresponds to only one State. For example, the Kafka Connector in Flink uses OperatorState, which stores all (partition, offffset) mappings of the consumption Topic of each Connector instance
- Flink provides the following data structures for Keyed State
- ValueState: single-valued state of type T. This state is bound to the corresponding Key. The simplest state is that the value is updated by update
value
Get status value - ListState: The state value on the Key is a list, which can be added to the list by the add method or returned by the get() method
可迭代
** to iterate over the status values - ReducingState: The user-passed one is called each time the add() method is called to add a value
ReduceFunction
And finally merged into oneA single
The state of the value - MapState
,>
: the status value is a Map. The user adds elements using the PUT or putAll methods. Get (key) obtains value by the specified key, and searches for it using entries(), keys() and values()
- AggregatingState <IN, OUT> : Keeps a single value representing the aggregation of all values added to the state. and
ReducingState
Conversely, the aggregation type may be different from the type of the element added to the state. Elements added with add(IN) call the user-specifiedAggregateFunction
aggregated FoldingState<T, ACC>: Obsolete recommended useAggregatingState
Preserve a single value that represents an aggregation of all values added to the state. In contrast to ReducingState, the aggregation type may be different from the element type added to the state. The element added with add(T) calls the user-specified elementFoldFunction
Fold into aggregate values
-
ValueState
ValueState: single-valued state of type T. This state is bound to the corresponding Key. The simplest state is updated by update and obtained by value
public class ValueStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> stream = environment .socketTextStream("192.168.150.110".8888); stream.flatMap(new RichFlatMapFunction<String, CarInfo>() { @Override public void flatMap(String s, Collector<CarInfo> collector) throws Exception { try { String[] sArr = s.split(","); collector.collect(CarInfo.builder().carId(sArr[0]).speed(Long.parseLong(sArr[1])).build()); } catch (Exception ex) { System.out.println(ex); } } }).keyBy(CarInfo::getCarId).map(new RichMapFunction<CarInfo, CarInfo>() { private ValueState<Long> valueState; @Override public void open(Configuration parameters) { ValueStateDescriptor<Long> state = new ValueStateDescriptor<Long> ("state", BasicTypeInfo.LONG_TYPE_INFO); valueState = getRuntimeContext().getState(state); } @Override public CarInfo map(CarInfo carInfo) throws Exception { if (valueState.value()==null||carInfo.getSpeed()>valueState.value()){ valueState.update(carInfo.getSpeed()); } carInfo.setSpeed(valueState.value()); return carInfo; } }).print("> > > > > > > >"); environment.execute("execute"); }}Copy the code
# # # console192.16888.180. nc -lk 8888### Input parameters1.200 2.101 3.103 1.102 2.201 3.303 #### sout >>>>>>>>:2> CarInfo(carId=1, speed=200) > > > > > > > > :1> CarInfo(carId=2, speed=201) > > > > > > > > :2> CarInfo(carId=3, speed=303) > > > > > > > > :2> CarInfo(carId=1, speed=200) > > > > > > > > :1> CarInfo(carId=2, speed=201) > > > > > > > > :2> CarInfo(carId=3, speed=303## Display the maximum speed for each vehicleCopy the code
-
ReduingState
Every time the add() method is called to add a value, the ReduceFunction passed in by the user will be called, and finally merged into a single state value, which is an aggregation function similar to the Reduce function
public class ReducingStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> stream = environment .socketTextStream("192.168.88.180".8888); stream.flatMap(new RichFlatMapFunction<String, CarInfo>() { @Override public void flatMap(String s, Collector<CarInfo> collector) throws Exception { try { String[] sArr = s.split(","); collector.collect(CarInfo.builder().carId(sArr[0]).speed(Long.parseLong(sArr[1])).build()); } catch (Exception ex) { System.out.println(ex); } } }).keyBy(CarInfo::getCarId).map(new RichMapFunction<CarInfo, CarInfo>() { private ReducingState<Long> valueState; @Override public void open(Configuration parameters) { ReducingStateDescriptor state = new ReducingStateDescriptor("state".new ReduceFunction<Long>() { @Override public Long reduce(Long t1, Long t2) { return t1+t2; } }, BasicTypeInfo.LONG_TYPE_INFO); valueState = getRuntimeContext().getReducingState(state ); } @Override public CarInfo map(CarInfo carInfo) throws Exception { valueState.add(carInfo.getSpeed()); carInfo.setSpeed(valueState.get()); return carInfo; } }).print("> > > > > > > >"); environment.execute("execute"); }}Copy the code
# # # console192.16888.180. nc -lk 8888### Input parameters1.100 2.101 3.103 1.101 2.201 3.303 1.100 2.101 3.103 1.101 2.201 3.303 #### sout >>>>>>>>:2> CarInfo(carId=1, speed=100) > > > > > > > > :1> CarInfo(carId=2, speed=201) > > > > > > > > :2> CarInfo(carId=3, speed=103) > > > > > > > > :2> CarInfo(carId=1, speed=201) > > > > > > > > :1> CarInfo(carId=2, speed=302) > > > > > > > > :2> CarInfo(carId=3, speed=406) > > > > > > > > :1> CarInfo(carId=2, speed=503) > > > > > > > > :2> CarInfo(carId=3, speed=509) > > > > > > > > :2> CarInfo(carId=1, speed=301) > > > > > > > > :2> CarInfo(carId=3, speed=812) > > > > > > > > :2> CarInfo(carId=1, speed=402) > > > > > > > > :1> CarInfo(carId=2, speed=604## Aggregation functionality similar to reduce functionalityCopy the code
-
ListState
The state values on the Key are a list of state values that can be added to the list by the add method or Iterable by the get() method that returns an **Iterable**
public class ListStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> stream = environment .socketTextStream("192.168.150.110".8888); stream.flatMap(new RichFlatMapFunction<String, CarInfo>() { @Override public void flatMap(String s, Collector<CarInfo> collector) throws Exception { try { String[] sArr = s.split(","); collector.collect(CarInfo.builder().carId(sArr[0]).speed(Long.parseLong(sArr[1])).build()); } catch (Exception ex) { System.out.println(ex); } } }).keyBy(CarInfo::getCarId).map(new RichMapFunction<CarInfo, String>() { private ListState<Long> listState; @Override public void open(Configuration parameters) { ListStateDescriptor<Long> state = new ListStateDescriptor<Long>("state", BasicTypeInfo.LONG_TYPE_INFO); listState = getRuntimeContext().getListState(state); } @Override public String map(CarInfo carInfo) throws Exception { listState.add(carInfo.getSpeed()); return String.format("Car ID :%s, speed history %s",carInfo.getCarId(),listState.get().toString()); } }).print("> > > > > > > >"); environment.execute("execute"); }}Copy the code
# # # console192.16888.180. nc -lk 8888### Input parameters1.100 2.201 3.301 1.102 2.201 3.303 1.101 2.201 3.301 1.102 2.201 3.302 #### sout >>>>>>>>:2< span style = "box-sizing: border-box;1, speed history [100] > > > > > > > > :2< span style = "box-sizing: border-box;3, speed history [301] > > > > > > > > :2< span style = "box-sizing: border-box;1, speed history [100.101] > > > > > > > > :1< span style = "box-sizing: border-box;2, speed history [201] > > > > > > > > :1< span style = "box-sizing: border-box;2, speed history [201.201] > > > > > > > > :2< span style = "box-sizing: border-box;3, speed history [301.303] > > > > > > > > :2< span style = "box-sizing: border-box;1, speed history [100.101.102] > > > > > > > > :1< span style = "box-sizing: border-box;2, speed history [201.201.201] > > > > > > > > :2< span style = "box-sizing: border-box;3, speed history [301.303.301] > > > > > > > > :1< span style = "box-sizing: border-box;2, speed history [201.201.201.201] > > > > > > > > :2< span style = "box-sizing: border-box;1, speed history [100.101.102.102] > > > > > > > > :2< span style = "box-sizing: border-box;3, speed history [301.303.301.302You can see that the speed of each section is recordedCopy the code
-
AggregatingState
Preserve a single value that represents an aggregation of all values added to the state. In contrast to ReducingState, the aggregation type may be different from the type of elements added to the state. Elements added using add(IN) call the user-specified AggregateFunction for aggregation
public class AggregatingStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> stream = environment .socketTextStream("192.168.150.110".8888); stream.flatMap(new RichFlatMapFunction<String, CarInfo>() { @Override public void flatMap(String s, Collector<CarInfo> collector) throws Exception { try { String[] sArr = s.split(","); collector.collect(CarInfo.builder().carId(sArr[0]).speed(Long.parseLong(sArr[1])).build()); } catch (Exception ex) { System.out.println(ex); } } }).keyBy(CarInfo::getCarId).map(new RichMapFunction<CarInfo, CarInfo>() { private AggregatingState<Long,Long> valueState; @Override public void open(Configuration parameters) { AggregatingStateDescriptor<Long,Long,Long> state = new AggregatingStateDescriptor<Long,Long,Long>("state".new AggregateFunction<Long, Long, Long>() { @Override // Initializes the accumulator value public Long createAccumulator(a) { return 0L; } @Override // Add values to the accumulator public Long add(Long v, Long acc) { return v+acc; } @Override // Return the final result public Long getResult(Long result) { return result; } @Override // Merge two accumulator values public Long merge(Long acc1, Long acc2) { return acc1+acc2; } },BasicTypeInfo.LONG_TYPE_INFO); valueState = getRuntimeContext().getAggregatingState(state); } @Override public CarInfo map(CarInfo carInfo) throws Exception { valueState.add(carInfo.getSpeed()); carInfo.setSpeed(valueState.get()); return carInfo; } }).print("> > > > > > > >"); environment.execute("execute"); }}Copy the code
# # # console192.16888.180. nc -lk 8888### Input parameters1.100 2.201 3.301 1.102 2.201 3.303 1.101 2.201 3.301 1.102 2.201 3.302 #### sout >>>>>>>>:2> CarInfo(carId=1, speed=100) > > > > > > > > :2> CarInfo(carId=1, speed=202) > > > > > > > > :1> CarInfo(carId=2, speed=201) > > > > > > > > :2> CarInfo(carId=3, speed=302) > > > > > > > > :1> CarInfo(carId=2, speed=402) > > > > > > > > :2> CarInfo(carId=3, speed=603) > > > > > > > > :1> CarInfo(carId=2, speed=603) > > > > > > > > :1> CarInfo(carId=2, speed=804) > > > > > > > > :2> CarInfo(carId=3, speed=906) > > > > > > > > :2> CarInfo(carId=1, speed=304) > > > > > > > > :2> CarInfo(carId=3, speed=1207) > > > > > > > > :2> CarInfo(carId=1, speed=405) ##id 1 ->100+102+101+102=205 id 2 -> 201+201+201+201=804 id 3 301+303+301+302=1207 Copy the code