“This is the third day of my participation in the November Gwen Challenge. See details of the event: The last Gwen Challenge 2021”.
The wordCount example we wrote earlier did not include state management. If a task fails during processing, its state in memory is lost and all data needs to be recalculated. At least once (exactly once) Flink introduces state and checkpoint from the semantics of fault tolerance and message processing.
So flink supports exactly once because of the introduction of state and checkpoint
Let’s first distinguish between two concepts:
The state:
State generally refers to the state of a specific task/operator:
-
The state data is stored by default in Java heap memory and in TaskManage node memory.
-
Operator represents some intermediate results generated by some operators during their operation.
Checkpoint:
A checkpoint is a global status snapshot of a Flink Job at a specific time, that is, the status of all tasks and operators.
Note: Task (subTask) is the basic unit of execution in Flink. Operator refers to operator transformation.
State can be logged and data can be recovered in the event of a failure.
There are two basic types of states in Flink:
-
Keyed State
-
Operator State
Keyed State and Operator State can exist in two forms:
-
Raw State
-
Managed State
Managed state is a state managed by the Flink framework.
We say that operator holds intermediate results of the data, in what type, and if we have managed state here, by the Flink framework itself
Users can manage the data structure of the original state by themselves. The framework uses byte[] to read and write the data in the original state and knows nothing about the internal data structure.
It is generally recommended to use the managed state on DataStream, but the original state is used when implementing a user-defined operator.
1. State-Keyed State
Based on the state on KeyedStream. This state is bound to a specific key. For each key on a KeyedStream, there is a state, such as stream.keyby (…). . The Operator State after KeyBy can be interpreted as the partitioned Operator State.
Save the state data structure:
ValueState: indicates the single-value status of type T. This state, bound to the corresponding key, is the simplest state. It can update status values with the update method and get status values with the value() method.
ListState: The state value on the key is a list. You can add value to the list using the add method; You can also iterate over state values by returning an Iterable from the get() method.
ReducingState: This state is passed in by the user’s reduceFunction, which will be called each time the Add method is called to add values, and finally merged into a single state value.
MapState
: that is, the status value is a map. The user adds elements through the PUT or putAll methods.
It is important to note that the State objects described above are only used to interact with the State (update, delete, empty, etc.), while the real State values may exist in memory, disk, or other distributed storage systems. So we’re just holding a handle to this state.
1. ValueState
Group and sum the following data using ValueState to save the intermediate results.
Development steps:
1. Obtain the execution environment for stream processing 2. Load the data source 3. Data conversion, define ValueState, save intermediate results 5. Data printing 6. Trigger executionCopy the code
ValueState: Tests data source:
List(
(1L, 4L),
(2L, 3L),
(3L, 1L),
(1L, 2L),
(3L, 2L),
(1L, 2L),
(2L, 2L),
(2L, 9L)
)
Copy the code
Sample code:
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState.ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint.TypeInformation}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream.StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object TestKeyedState {
class CountWithKeyedState extends RichFlatMapFunction[(Long.Long), (Long.Long)] {
/** * ValueState Status handle. The first value is count and the second value is sum. * /
private var sum: ValueState[(Long.Long= _)]override def flatMap(input: (Long.Long), out: Collector[(Long.Long)]) :Unit = {
// Get the current status value
val tmpCurrentSum: (Long.Long) = sum.value
// Status default value
val currentSum = if(tmpCurrentSum ! =null) {
tmpCurrentSum
} else{(0L, 0L)
}
/ / update
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// Update the status value
sum.update(newSum)
// If count >=3, clear the status value and recalculate
if (newSum._1 >= 3) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration) :Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long.Long(a)]"average".// State name
TypeInformation.of(new TypeHint[(Long.Long)] () {}))// State type)}}def main(args: Array[String) :Unit = {
// Initialize the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Build the data source
val inputStream: DataStream[(Long.Long)] = env.fromCollection(
List((1L, 4L),
(2L, 3L),
(3L, 1L),
(1L, 2L),
(3L, 2L),
(1L, 2L),
(2L, 2L),
(2L, 9L))
)
// Perform data processing
inputStream.keyBy(0)
.flatMap(new CountWithKeyedState)
.setParallelism(1)
.print
// Run the task
env.execute
}
}
Copy the code
2. MapState
Group and sum the following data using MapState to save the intermediate results:
1. Obtain the execution environment for stream processing 2. Load the data source 3. Data conversion, MapState definition, save intermediate results 5. Data printing 6. Trigger executionCopy the code
MapState: Test data source:
List(
("java", 1),
("python", 3),
("java", 2),
("scala", 2),
("python", 1),
("java", 1),
("scala", 2)
)
Copy the code
Sample code:
object MapState {
def main(args: Array[String) :Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
/** * group and sum the following data * 1 using MapState to save the intermediate result. Get stream processing execution environment * 2. Load data source * 3. Data grouping * 4. Data conversion, define MapState, save intermediate result * 5. Data print * 6. Trigger execution */
val source: DataStream[(String.Int)] = env.fromCollection(List(("java".1),
("python".3),
("java".2),
("scala".2),
("python".1),
("java".1),
("scala".2)))
source.keyBy(0)
.map(new RichMapFunction[(String.Int), (String.Int)] {
var mste: MapState[String.Int] = _
override def open(parameters: Configuration) :Unit = {
val msState = new MapStateDescriptor[String.Int] ("ms".TypeInformation.of(new TypeHint[(String)] {}),
TypeInformation.of(new TypeHint[(Int)] {}))
mste = getRuntimeContext.getMapState(msState)
}
override def map(value: (String.Int)) : (String.Int) = {
val i: Int = mste.get(value._1)
mste.put(value._1, value._2 + i)
(value._1, value._2 + i)
}
}).print()
env.execute()
}
}
Copy the code
2. State-Operator State
State independent of Key, State bound to Operator, and only one State for the entire Operator.
Save the state data structure:
ListState
Flink’s Kafka Connector, for example, uses operator state. It stores all (partition, offset) mappings of the consumer topics in each connector instance.
Steps:
-
Obtaining the Execution Environment
-
Set checkpoint mechanism: path, restart policy
-
Custom data source
- You need to inherit parallel data sources and CheckpointedFunction
- Set the listState, obtained by the context object context
- Data processing, keeping offset
- To make the snapshot
-
The data to print
-
Trigger execution
Sample code:
import java.util
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ListState.ListStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint.TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext.FunctionSnapshotContext}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction.SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object ListOperate {
def main(args: Array[String) :Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// Restart the policy
env.setRestartStrategy(RestartStrategies.failureRateRestart(3.Time.minutes(1), Time.seconds(5)))
// Simulate the Kakfa offset
env.addSource(new MyRichParrelSourceFun)
.print()
env.execute()
}
}
class MyRichParrelSourceFun extends RichParallelSourceFunction[String]
with CheckpointedFunction {
var listState: ListState[Long] = _
var offset: Long = 0L
// Task running
override def run(ctx: SourceFunction.SourceContext[String) :Unit = {
val iterState: util.Iterator[Long] = listState.get().iterator()
while (iterState.hasNext) {
offset = iterState.next()
}
while (true) {
offset += 1
ctx.collect("offset:"+offset)
Thread.sleep(1000)
if(offset > 10) {1/0}}}// Cancel the task
override def cancel() :Unit=???// Create a snapshot
override def snapshotState(context: FunctionSnapshotContext) :Unit = {
listState.clear()
listState.add(offset)
}
// Initialization state
override def initializeState(context: FunctionInitializationContext) :Unit = {
listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long] ("listState".TypeInformation.of(new TypeHint[Long] {})))}}Copy the code
3. Broadcast State
Broadcast State is a new feature introduced in Flink 1.5. During the development process, you can use the Broadcast State feature if you need to send/Broadcast low-throughput events such as configurations and rules to all downstream tasks. Downstream tasks take these configurations, rules and save them as BroadcastState, and apply them to calculations in another data stream.
1) API is introduced
Ordinarily, we would create a Keyed or non-keyed Data Stream and then a Broadcasted Stream, Finally, connect to Broadcasted Stream via Data Stream (call connect method), which broadcasts the Broadcast State to each Task downstream of the Data Stream.
If the Data Stream is Keyed Stream, the connection to the Broadcasted Stream, need to use when adding processing ProcessFunction KeyedBroadcastProcessFunction, Below is KeyedBroadcastProcessFunction API, code as shown below:
public abstract class KeyedBroadcastProcessFunction<KS.IN1.IN2.OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}
Copy the code
The meanings of the parameters in the above generics are described as follows:
- KS: represents the type of Key that Flink programs rely on when calling keyBy to build a Stream from the Source Operator at the most upstream;
- IN1: indicates the type of Data records in non-broadcast Data streams.
- IN2: indicates the type of data records in Broadcast Stream.
- OUT: after KeyedBroadcastProcessFunction processElement () and processBroadcastElement () method after processing the output data record type.
If the Data Stream is a non-keyed Stream, then it is Broadcasted Stream. The API for BroadcastProcessFunction is shown as follows:
public abstract class BroadcastProcessFunction<IN1.IN2.OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}
Copy the code
The meaning of each parameter of the generic, and after the front of the generic type KeyedBroadcastProcessFunction 3 same meaning, but no call keyBy to partition the original Stream operation, there is no need for KS generic parameters.
Matters needing attention:
-
Broadcast State is of Map type, that is, K-V type.
-
Broadcast State can only be changed in the broadcast-side method processBroadcastElement; ProcessElement is read-only in a non-broadcast method.
-
Broadcast State is stored in memory at run time.
2) Scenario examples
-
Dynamic update of calculation rules: If the event flow needs to be calculated according to the latest rules, the rules can be broadcast to the downstream Task as a broadcast status.
-
Adding additional fields in real time: If the basic user information needs to be added to the event flow in real time, the basic user information can be broadcast to the downstream Task as the broadcast status.