I. Classification of states

Compared with other flow computing frameworks, one of Flink’s more important features is its support for stateful computing. That is, you can save the intermediate calculation results and provide them for use in subsequent calculations:

Specifically, Flink divides State into Keyed State and Operator State:

2.1 Operator Status

Operator State: As the name implies, the State is bound to the Operator. The State of an Operator cannot be accessed by other operators. Operator State = Operator State = Operator State Each operator state is bound to one parallel operator instance, so it is more accurate to say that an operator state is bound to one parallel operator instance. Then it should have two corresponding operator states:

2.2 Keying state

Keyed State: is a special operator State, that is, states are distinguished by key value, Flink will maintain a State instance for each type of key value. As shown in the figure below, each color represents a different key value, corresponding to four different state instances. Note that the keying state can only be used on a KeyedStream, we can use stream.keyby (…) To get KeyedStream.

Second, state programming

2.1 Keying state

Flink provides the following data formats to manage and store Keyed State:

  • ValueState: Stores the state of single-value types. You can useupdate(T)Update and passT value()Search for.
  • ListState: Stores the state of the list type. You can useadd(T)addAll(List)Add elements; And through theget()Get the entire list.
  • ReducingState: Used to store the results calculated by ReduceFunction, usedadd(T)Add elements.
  • AggregatingState: Stores the results calculated by the AggregatingStateadd(IN)Add elements.
  • FoldingState: has been identified as deprecated and will be removed in a future release. Officially recommendedAggregatingStateInstead.
  • MapState: Maintains the Map status.

All the above increase, delete, change and check methods need not be hard to remember, in use through the grammar prompt to call. Here is a specific usage example: Suppose we are developing a monitoring system, when the monitoring data exceeds the threshold a certain number of times, need to send an alarm message. The reason why a certain number of times is needed here is that due to occasional reasons, it does not represent anything if the threshold is exceeded once in a while. Therefore, it is necessary to trigger the alarm after a certain number of times, which requires the use of Flink state programming. The relevant codes are as follows:

public class ThresholdWarning extends 
    RichFlatMapFunction<Tuple2<String.Long>, Tuple2<String.List<Long>>> {

    // Use ListState to store the status of abnormal data
    private transient ListState<Long> abnormalData;
    // Threshold to be monitored
    private Long threshold;
    // The number of times the alarm was triggered
    private Integer numberOfTimes;

    ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
    }

    @Override
    public void open(Configuration parameters) {
        // Get the state instance by the state name (handle), or create it automatically if it doesn't exist
        abnormalData = getRuntimeContext().getListState(
            new ListStateDescriptor<>("abnormalData", Long.class));
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out)
        throws Exception {
        Long inputValue = value.f1;
        // If the input value exceeds the threshold, the abnormal data is recorded
        if (inputValue >= threshold) {
            abnormalData.add(inputValue);
        }
        ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
        // If abnormal data occurs for a certain number of times, the output alarm message
        if (list.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + "Exceeds specified threshold", list));
            // Clear the state after the alarm message is outputabnormalData.clear(); }}}Copy the code

Call custom status monitoring. Here, we use A and B to represent different types of monitoring data and monitor their data respectively:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
    Tuple2.of("a".50L), Tuple2.of("a".80L), Tuple2.of("a".400L),
    Tuple2.of("a".100L), Tuple2.of("a".200L), Tuple2.of("a".200L),
    Tuple2.of("b".100L), Tuple2.of("b".200L), Tuple2.of("b".200L),
    Tuple2.of("b".500L), Tuple2.of("b".600L), Tuple2.of("b".700L));
tuple2DataStreamSource
    .keyBy(0)
    .flatMap(new ThresholdWarning(100L.3))  // When the threshold of 100 is exceeded for 3 times, the alarm will be raised
    .printToErr();
env.execute("Managed Keyed State");
Copy the code

The following output is displayed:

2.2 Status Validity Period

Any of the preceding types of keyed state supports the TTL, as shown in the following example:

StateTtlConfig ttlConfig = StateTtlConfig
    // Set the validity period to 10 seconds
    .newBuilder(Time.seconds(10))  
    // Set the validity period update rule, set here to reset its validity period to the specified 10 seconds when creating and writing
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
    / * set as long as the value date is not visible, the other is an optional value ReturnExpiredIfNotCleanedUp, representative even if the value is out of date, but if have not been physically deleted, is visible * /
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
descriptor.enableTimeToLive(ttlConfig);
Copy the code

2.3 Operator Status

Compared with keying state, operator state currently supports only the following three storage types:

  • ListState: State of the storage list type.
  • UnionListState: The state that stores the list type. The difference between ListState and UnionListState is that if the degree of parallelism changes, ListState will summarize all the concurrent state instances of the operator and divide them equally among new tasks. UnionListState simply aggregates all concurrent state instances, and the specific partitioning behavior is defined by the user.
  • BroadcastState: state of the operator used for broadcasting.

Here we continue to use the above example, assuming that we do not need to distinguish the types of monitoring data at this time, as long as the monitoring data exceeds the threshold and reaches the specified number of times, we will alarm, the code is as follows:

public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String.Long>, 
Tuple2<String.List<Tuple2<String.Long>>>> implements CheckpointedFunction {

    // Abnormal data
    private List<Tuple2<String, Long>> bufferedData;
    // checkPointedState
    private transient ListState<Tuple2<String, Long>> checkPointedState;
    // Threshold to be monitored
    private Long threshold;
    / / the number of
    private Integer numberOfTimes;

    ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
        this.bufferedData = new ArrayList<>();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意这里获取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore().
            getListState(new ListStateDescriptor<>("abnormalData",
                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                })));
        // If a restart occurs, the state needs to be restored from the snapshot
        if (context.isRestored()) {
            for(Tuple2<String, Long> element : checkPointedState.get()) { bufferedData.add(element); }}}@Override
    public void flatMap(Tuple2
       
         value, Collector
        
         >>> out)
        
       ,> {
        Long inputValue = value.f1;
        // If the value exceeds the threshold, record it
        if (inputValue >= threshold) {
            bufferedData.add(value);
        }
        // If the number of times exceeds the specified number, the alarm information will be output
        if (bufferedData.size() >= numberOfTimes) {
             // Output the hashcode of the status instance
             out.collect(Tuple2.of(checkPointedState.hashCode() + "Threshold alert!", bufferedData)); bufferedData.clear(); }}@Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // Data is stored to checkPointedState when a snapshot is taken
        checkPointedState.clear();
        for(Tuple2<String, Long> element : bufferedData) { checkPointedState.add(element); }}}Copy the code

Call the custom operator state, where the parallelism needs to be set to 1:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpoint mechanism
env.enableCheckpointing(1000);
// Set parallelism to 1
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
    Tuple2.of("a".50L), Tuple2.of("a".80L), Tuple2.of("a".400L),
    Tuple2.of("a".100L), Tuple2.of("a".200L), Tuple2.of("a".200L),
    Tuple2.of("b".100L), Tuple2.of("b".200L), Tuple2.of("b".200L),
    Tuple2.of("b".500L), Tuple2.of("b".600L), Tuple2.of("b".700L));
tuple2DataStreamSource
    .flatMap(new ThresholdWarning(100L.3))
    .printToErr();
env.execute("Managed Keyed State");
}
Copy the code

The output is as follows:

In the calling code above, we set the parallelism of the program to 1, and you can see that the hashCodes of the state instances in all three outputs are consistent, proving that they are all the same state instance. Assuming that the parallelism is set to 2, the output is as follows:

It can be seen that the hashcodes of the state instances in the two outputs are inconsistent, indicating that they are not the same state instance. This is why, as mentioned above, an operator state is bound to a concurrent operator instance. Here only two output at the same time, because in the case of concurrent processing, thread 1 May get 5 abnormal values, thread 2 May get four abnormal values, for more than three times can output, so in this case will appear only output two records, so you need to program the parallelism of set to 1.

Checkpoint mechanism

3.1 CheckPoints

In order to provide fault tolerance in Flink, Flink provides a checkpoint mechanic (that I found). Through checkpoint mechanism, Flink periodically generates checkpoint barrier on data stream. When an operator receives a barrier, it will generate a snapshot based on the current state and then pass the barrier to the downstream operator. After receiving the barrier, the downstream operator will A snapshot is also generated based on the current state, which is successively passed to the last Sink operator. When an exception occurs, Flink can restore all operators to their previous state based on the latest snapshot data.

3.2 Enabling checkpoints

By default, the checkpoint mechanism is turned off and needs to be enabled in the program:

// Enable the checkpoint mechanism and specify the interval between status checkpoints
env.enableCheckpointing(1000); 

// Other optional configurations are as follows:
// Set the semantics
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Set the minimum interval between two checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Set the timeout period for Checkpoint operation
env.getCheckpointConfig().setCheckpointTimeout(60000);
// Sets the maximum number of concurrent checkpoints
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Persist checkpoints to external storage
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// If there is a closer savepoint, whether to roll back the job to that checkpoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Copy the code

3.3 Savepoint mechanism

Savepoints is a special implementation of the Checkpoint mechanism. It allows you to manually Checkpoint and persist the result to a specified path. It is mainly used to prevent the Flink cluster from losing state when it is restarted or upgraded. The following is an example:

#Fires the Savepoint of the job with the specified ID and stores the result to the specified directory
bin/flink savepoint :jobId [:targetDirectory]
Copy the code

For more commands and configurations, see savepoints

State backend

4.1 Classification of status managers

By default, all state is stored in the JVM’s heap memory, which is likely to run out of memory if there is too much state data, so Flink provides alternative ways to store state data, collectively known as the state back end (or state manager) :

There are three main types:

1. MemoryStateBackend

The default approach, which is based on the JVM’s heap memory, is mainly suitable for local development and debugging.

2. FsStateBackend

Storage is based on file systems, which can be local file systems or distributed file systems such as HDFS. Note That although FsStateBackend is used, data in progress is still stored in TaskManager’s memory. A status snapshot is written to a specified file system only at checkpoint.

3. RocksDBStateBackend

RocksDBStateBackend is a third-party state manager built into Flink. It uses the embedded key-value database RocksDB to store ongoing data. At checkpoint, data is persisted to a specified file system. Therefore, when RocksDBStateBackend is used, you need to configure a file system for persistent storage. The reason for this is that RocksDB is less secure as an embedded database, but has a faster read rate than the full file system approach. Compared with the full memory method, its storage space is larger, so it is a more balanced solution.

4.2 Configuration Methods

Flink supports configuring backend managers in two ways:

The first method is code based and only applies to the current job:

/ / configuration FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
/ / configuration RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
Copy the code

When configuring RocksDBStateBackend, you need to import the following additional dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>Flink statebackend - rocksdb_2. 11</artifactId>
    <version>1.9.0</version>
</dependency>
Copy the code

The second method is based on the flink-conf.yaml configuration file. The configuration takes effect for all jobs deployed on the cluster:

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
Copy the code

Note: All sample code for this article can be downloaded at flink-state-management

The resources

  • Working with State
  • Checkpointing
  • Savepoints
  • State Backends
  • Fabian Hueske, Vasiliki Kalavri. Stream Processing with Apache Flink. O’Reilly Media. 2019-4-30

See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series