Statechu initial:

State refers to the intermediate calculation result or metadata attribute of the compute node during the flow calculation. For example, if the aggregation process is aggregation, the intermediate aggregation result is recorded in the State. For example, if Apache Kafka is the data source, the offset of the read record is also recorded. This State data is persisted (inserted or updated) during the calculation. So State in Apache Flink is a time-dependent snapshot of the internal data (calculated data and metadata properties) of the Apache Flink task.

 

State: Keyed State and Operator State

Keyed State

Keyed State is a kind of key-based State, which is always bound to the key and has no relationship with the State between the key and the key, so it will not affect each other

Operator State

Operator State is Operate based. Each operation is stateful and does not affect each other. Flink’s Kafka Connector, for example, uses operator state. It stores all (partition, offset) mappings of the consumer topics in each connector instance.

Raw State and Managed State

  • Raw: The user manages the data structure of the Raw state by himself. The framework uses byte[] to read and write the data in the Raw state and knows nothing about the internal data structure.
  • Managed State: Managed State is Managed by Flink framework, such as ValueState, ListState, MapState, etc.

Keyed State —- Managed State

  • ValueState: This will preserve a value that can be updated and retrieved (as described above, scoped to the key of the input element, so each key seen by this operation may have a value). This value can be used by setting update(T) and by retrieving T value(). Example:
package flinkscala.State.Keyed_State import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object valueStateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L), (1L, 6L), (1L, 2L), (1L, 9L), (1L, 2L), (1L, 3L) )).keyBy(_._1) .flatMap(new CountWindowAverage()) .print() env.execute("average Test") } } class CountWindowAverage Extends RichFlatMapFunction[(Long,Long),(Long,Long)]{extends RichFlatMapFunction[(Long,Long),(Long,Long)]{ ValueState[(Long,Long)] = _ override def flatMap(value: (Long, Long), out: Collector[(Long, Long)]): Unit = {// first access ValueState, and // of course, if it is empty, that is, the first time to use it, initialize to 0, 0 var tmpCurrentSum = sum.value() val surrentSum = if(tmpCurrentSum ! =null){tmpCurrentSum}else {(0L,0L)} * */ val newSum = (surrentsum._1 +1, surrentsum._2 +value._2) If (newsum._1 >=2){out.collect((value._1, newsum._2 / newsum._1)) sum.clear()// Override def open(parameters: Configuration): Unit ={ sum = getRuntimeContext .getState( new ValueStateDescriptor[(Long, Long)]("average",createTypeInformation[(Long,Long)]) ) } }Copy the code

 

ListState: This preserves the list of elements. You can append elements and retrieve all of Iterable’s currently stored elements. Iterable Iterable get() can be retrieved using add(T) or add the element addAll(List). You can also override an existing List update(List) using

ReducingState: This preserves a value that represents the collection of all values added to the state. The interface is similar to ListState but using the new element Add (T) will use the specified simplification as a summary ReduceFunction.

AggregatingState<IN, OUT> : This will retain a value representing the set of all values added to the state. In contrast to ReducingState, the aggregation type may be different from the type of the element added to the state. The interface is the same as for, ListState but with the added element Add (IN) is the AggregateFunction that uses the specified aggregation.

FoldingState<T, ACC> : This preserves a value that represents the set of all values added to the state. In contrast to ReducingState, the aggregation type may be different from the type of the element added to the state. The interface is similar to ListState but uses the added element add(T) to use the specified fold into a collection FoldFunction. Deprecated in 1.4 and replaced with AggregatingState

MapState<UK, UV> : This will keep a list of mappings. You can put key-value pairs into state and retrieve all of Iterable’s currently stored mappings. Use put(UK, UV) or add a Map putAll(Map<UK, UV>). You can use retrieve the value GET (UK) associated with the user’s key. For mapping, key and value iterable views can use retrieved entries(), keys() and values() respectively. You can also use isEmpty() to check whether this map contains any key-value mappings.

We can get the state by using the getState method of getRunTimeContext, one state is bound to a handle state descriptor, different states hold different values, and in this case you might need multiple states, The StateDescriptor is used to obtain the corresponding state.

 

State Time-To-Live (TTL)

For the state, we can sometimes set its age, or expiration, which means it automatically clears after a certain amount of time, or do something about it, kind of like expiration in Redis.

import org.apache.flink.api.common.state.StateTtlConfig import org.apache.flink.api.common.state.ValueStateDescriptor The import org.apache.flink.api.com mon. Time. Time val ttlConfig = StateTtlConfig / / this is used to configure the expiration time of survival .newBuilder(time.seconds (1)) // Configure when to refresh the TTL status: OnCreateAndWrite- Only when creating and writing access, OnReadAndWrite - have read access. SetUpdateType (StateTtlConfig. UpdateType. OnCreateAndWrite) / / configuration state visibility is used to configure whether clearance has not yet expired default value ( .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) stateDescriptor.enableTimeToLive(ttlConfig)Copy the code

Clearing expired Status

Val ttlConfig = StateTtlConfig. NewBuilder (Time. Seconds (1)). DisableCleanupInBackground / / clean up the background state. BuildCopy the code

Managed Operator State

As mentioned, the operation state is operator-based, and then managed, with Flink already having ValueState and so on

To use managed operation state, stateful functionality can implement the more general CheckpointedFunction interface, as well as the ListCheckpointed< T extends Serializable> interface.

CheckpointedFunction

SnapshotState () is called whenever a checkpoint must be performed. InitializeState () void snapshotState(FunctionSnapshotContext Context) throws Exception each time a user-defined function is initialized. void initializeState(FunctionInitializationContext context) throws Exception; Either when the function is first initialized or when the function is actually restored from a previous checkpoint. Therefore, initializeState () is not only a place to initialize different types of states, but also a place to contain the logic for state recovery.Copy the code

Currently, list-style managed operator state is supported. The state should be serializable objects with separate lists,

  • Even-split Redistribution: Each operator returns a list of state elements. The entire state is logically a concatenation of all the lists. When restoring/redistributing, the list is evenly divided into as many sublists as possible because of the parallel operator. Each operator yields a sublist, which can be empty or contain one or more elements. For example, if the operator’s checkpoint state contains elements element1 and element2 when the degree of parallelism is 1, element1 may end up in operator instance 0 when the degree of parallelism is increased to 2, and Element2 will go to Operator instance 1.
  • Union Redistribution: Each operator returns a list of state elements. Logically, the entire state is a concatenation of all the lists. When restoring/redistributing, each operator gets the full list of state elements.

Even – split “redistribution demonstration

package flinkscala.State.Operatior_State import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import scala.collection.mutable.ListBuffer object opratorTest  { def main(args: Array[String]): Unit = { } } class BufferingSink(threshold: Int = 0) extends SinkFunction[(String,Int)] with CheckpointedFunction{ private var checkpointedState: ListState[(String,Int)] = _ private val bufferedElements = ListBuffer[(String,Int)]() override def invoke(value: (_root_.scala.Predef.String, Int), context: _root_.org.apache.flink.streaming.api.functions.sink.SinkFunction.Context[_]): Unit = { bufferedElements += value if(bufferedElements.size == threshold){ for(element <- bufferedElements){ Clear ()}} override def snapshotState(context: FunctionSnapshotContext): Unit = { checkpointedState.clear() for (element <- bufferedElements) { checkpointedState.add(element) } } override def initializeState(context: FunctionInitializationContext): Unit = { val descriptor = new ListStateDescriptor[(String,Int)]( "buffered-elements", TypeInformation.of(new TypeHint[(String,Int)] {}) ) checkpointedState = context.getOperatorStateStore.getListState(descriptor) if(context.isRestored){ for(element <- checkpointedState.get()){ bufferedElements += element } } } }Copy the code

ListCheckpointed

This ListCheckconservative interface is the first of a more limited variety, CheckpointedFunction, that supports only list-style states and uses even split reassignment schemes when restoring. It also needs to implement two methods:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;
Copy the code

The list of objects should be returned to the checkpoint on the snapshotState() operator, and the restoreState must process such a list when restoring. If the status is not to partition, can return to the Collections. SingletonList (MY_STATE) snapshotState ().

State radio

Let’s start with a scenario: imagine a real-time matching of user actions and patterns on an e-commerce platform. It captures all user action data as a user action stream. The site’s operations team is dedicated to analyzing user actions to increase sales, improve user experience, and detect and prevent malicious behavior. You implement a flow application that detects patterns in the flow of user events. Of course, we could have killed off this “pattern” in our code, but it’s not ideal to always have to redeploy our application, and instead of using the broadcast state, we would have to propagate it directly with the UserActions stream, so that the same broadcast state would be propagated repeatedly, consuming resources.

He was like this

Define a User Actions stream to record individual User actions, such as logging in, adding to cart, paying, logging out, and so on.

Define a flow Patterns, pattern matching is used to record the platform needs, such as a pattern is to record the user exit immediately after landing, or exit immediately after the user add to cart, anyhow is an action after another action to form a pattern, of course, can also be multiple action together, but here in order to convenient, Just two moves for now.

Something like this:

 

We can see that the Pattern below is a Pattern stream that broadcasts two as a group, two as a Pattern, to other operators. Let the broadcast operator match the User Actions.

 

Pattern broadcast out: As can be seen from the figure, the login -> exit Pattern broadcast out. In this case.

 

Then the User Actions stream is assigned to each partition by key in parallel. How good it is. In addition, every time an action arrives, each partition will save the last action and combine the new action to form a Pattern to match the broadcast Pattern. Finally, the new action will replace the last action.

 

You can see that the Key1 operation matches Pattern, and then collect

 

However, if a new Pattern is added, it will replace the original Pattern and thus be compared with the new Pattern.

Code implementation

package flinkjava.State; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.util.Collector; public class broadcastState { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator < Action > actions = env. SocketTextStream (" 127.0.0.1 ", 9000). The map (new MapFunction < String, Action>() { @Override public Action map(String value) throws Exception { String[] dataArray = value.split(","); return new Action(Long.valueOf(dataArray[0].trim()), dataArray[1].trim()); }}); SingleOutputStreamOperator < Pattern > patterns = env. SocketTextStream (" 127.0.0.1 ", 9001). The map (new MapFunction < String, Pattern>() { @Override public Pattern map(String value) throws Exception { String[] dataArray = value.split(","); Pattern pattern = new Pattern(); pattern.setFirstAction(dataArray[0].trim()); pattern.setSecondAction(dataArray[1].trim()); return pattern; }}); KeyedStream<Action, Long> actionByUser = actions.keyBy(action -> action.userId); MapStateDescriptor<Void,Pattern> bcStateDescriptor = new MapStateDescriptor<Void, Pattern>("patterns", Types.VOID,Types.POJO(Pattern.class)); BroadcastStream< pattern > bcedPatterns = Patterns. Broadcast (bcStateDescriptor); // BroadcastStream< pattern > bcedPatterns = patterns. SingleOutputStreamOperator<Tuple2<Long, Pattern>> matches = actionByUser.connect(bcedPatterns) .process(new PatternEvaluator()); matches.map(new MapFunction<Tuple2<Long, Pattern>, Tuple3<Long,String,String>>() { @Override public Tuple3<Long, String, String> map(Tuple2<Long, Pattern> value) throws Exception { return new Tuple3<>(value.f0,value.f1.getFirstAction(),value.f1.getSecondAction()); } }).print(); try { env.execute("broadcastJob"); } catch (Exception e) { e.printStackTrace(); } } } class PatternEvaluator extends KeyedBroadcastProcessFunction<Long,Action,Pattern, Tuple2<Long,Pattern>>{// Each user maintains the status of the last operation ValueState<String> prevActionState; // Broadcast StateDescriptor MapStateDescriptor<Void,Pattern> patternDesc; // Broadcast StateDescriptor MapStateDescriptor<Void,Pattern> patternDesc; @Override public void open(Configuration parameters) throws Exception { prevActionState = getRuntimeContext().getState( new ValueStateDescriptor<String>("lastAction",Types.STRING) ); patternDesc = new MapStateDescriptor<Void, Pattern>("patterns",Types.VOID,Types.POJO(Pattern.class)); } // This method is called for elements of a non-broadcast stream, Override public void processElement(Action value, ReadOnlyContext CTX, Collector<Tuple2<Long, Pattern>> out) throws Exception {// Get broadcastState (this.patternDesc).get(null); PreAction = prevactionState.value (); preAction = prevactionState.value (); if(pattern! =null && preAction ! = null){// If the last action matches the first action of the pattern, and this action matches the second action of the pattern, Then is to satisfy the if (pattern. FirstAction. Equals (preAction) && pattern. SecondAction. Equals (value. The action)) {out. Collect (new Tuple2<>(ctx.getCurrentKey(), pattern)); Prevactionstate.update (value.action); prevactionState.update (value.action); } @override public void broadcastElement (Pattern value, Context CTX, Collector<Tuple2<Long, Pattern>> out) throws Exception { BroadcastState<Void, Pattern> bcstate = ctx.getBroadcastState(patternDesc); bcstate.put(null,value); } } class Action{ Long userId; String action; public Action(Long userId, String action) { this.userId = userId; this.action = action; }}Copy the code

Running results:

Let’s start with the Pattern (A, B) :

 

And I’m gonna say a couple of actions (UserId, Action)

 

Results obtained:

 

If there is a new Pattern(b, e)

 

New actions (UserId, Action) :

 

Results:

 

KeyedBroadcastProcessFunction interface

The above is implemented

KeyedBroadcastProcessFunction interface

There are three ways to do this:

  • ProcessBroadcastElement () method: This method is called when the broadcast stream’s data arrives. In the above scenario, we use MapState to save the broadcast state, and a null key to save only a Pattern
  • ProcessElement () method: This is called each time an unbroadcast data arrives
  • OnTimer () method: This can still register the timer, and our previous scenario can actually be optimized by clearing the status of the last operation if a user has not performed an operation for a long time.