“This is the 24th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

An overview,

State, in fact, refers to the intermediate calculation results of Flink program.

Flink supports different types of state and has a dedicated mechanism and state manager for state persistence.

Flink divides calculations into stateful and stateless, depending on whether the intermediate results need to be saved:

  • Stateful computing: depends on events before or after

  • Stateless computing: independent

Flink’s official website also provides several cases applicable to state calculation:

  • Complex event processing gets events that conform to a specific time rule

  • Aggregate calculation

  • Model training for machine learning

  • Use historical data for calculations

Depending on the data structure, Flink defines a variety of states for different scenarios:

  • ValueState: indicates the single-value status of type T. This state, bound to the corresponding key, is the simplest state. It can update status values with the update method and get status values with the value() method.

  • ListState: The state value on the key is a list. You can add value to the list using the add method; You can also iterate over state values by returning an Iterable

    from the get() method.

  • ReducingState: This state is passed in by the user’s reduceFunction, which will be called each time the Add method is called to add values, and finally merged into a single state value.

  • FoldingState: Similar to ReducingState, except that its state value type can be different from the element type passed in the Add method (this state will be removed in a future version of Flink).

  • MapState: indicates that the status value is a map. The user adds elements through the PUT or putAll methods.

Each type of state has the following two managed modes:

  1. Managed State: The data structure for this type of State is defined by the engine, and the Flink runtime is responsible for serializing and writing the State back end. When parallelism changes, the Flink engine is responsible for re-splitting managed state onto each instance. In addition, the engine is responsible for optimizing the storage efficiency of managed state.

  2. Unmanaged (Raw State) : This type of State is defined by the application, and the engine writes the State back end in byte stream format.

State classification

In Flink, according to whether the data set is partitioned according to a certain Key, the states are divided into:

  • Keyed State

  • Operator State (non-keyed State) Two types.

As shown in the figure below, Keyed State is the State on the flow after partitioning, and each Key has its own State.

The octagons, circles, and triangles in the figure manage their respective states, and only the specified key can access and update their corresponding states.

Unlike Keyed State, Operator State can be applied to all operators. Each Operator subtask or instance of an Operator shares a State that can be accessed and updated by data flowing into the Operator subtask. The data on each operator subtask shares its own state.

However, it should be noted that no matter Keyed State or Operator State, Flink’s State is local, that is, each Operator subtask maintains the corresponding State storage of this Operator subtask, and the states between Operator subtasks cannot be accessed each other.

Case – Usestateaveraging

Original data:

(1, 3) (1, 5) (1, 7) (1, 4) (1, 2)Copy the code

Ideas:

  1. Read the data source
  2. Base data sources onkeygrouping
  3. In accordance with thekeyGrouping policies that invoke stateful processing for streaming data

State processing: every time the sum of the first element reaches two, the sum of the second element is divided by the sum of the first element, and finally output.

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class StateDemo {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromElements(Tuple2.of(1L.3L), Tuple2.of(1L.5L), Tuple2.of(1L.7L), Tuple2.of(1L.5L), Tuple2.of(1L.2L))
                .keyBy(0)
                .flatMap(new CountWindowAverage())
                .printToErr();

        env.execute("submit job");
    }

    public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long.Long>, Tuple2<Long.Long>> {

        private transient ValueState<Tuple2<Long, Long>> sum;
        public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

            Tuple2<Long, Long> currentSum;

            / / access ValueState
            if(sum.value() == null) {
                currentSum = Tuple2.of(0L.0L);
            }else {
                currentSum = sum.value();
            }

            / / update
            currentSum.f0 += 1;

            // Add 1 to the second element
            currentSum.f1 += input.f1;

            / / update the state
            sum.update(currentSum);

            // If count is greater than or equal to 2, find and clear state
            if (currentSum.f0 >= 2) {
                out.collect(newTuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); }}public void open(Configuration config) {
            ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                    new ValueStateDescriptor<>(
                            "average".// State's name
                            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
                    ); // Set the default value


            // The TTL function can be enabled in any state descriptor by passing the configuration
            StateTtlConfig ttlConfig = StateTtlConfig
                    .newBuilder(Time.seconds(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); descriptor.enableTimeToLive(ttlConfig); sum = getRuntimeContext().getState(descriptor); }}}Copy the code

The following output is displayed:

6 > > 6 (1, 4) (1, 6)Copy the code

State is accessed by inheriting RichFlatMapFunction, and a handle to the State is retrieved by getRuntimeContext().getState(Descriptor).

Status Back-end types and configurations

Flink’s state data can reside either in the JVM’s heap or off-heap memory, or in third-party storage.

By default, Flink’s state is stored in TaskManager’s memory, and Flink provides three available state backends for saving state backends in different situations:

  • MemoryStateBackend

  • FsStateBackend

  • RocksDBStateBackend

(1)MemoryStateBackend

MemoryStateBackend stores state data in memory and is used for local debugging. Note the following when using MemoryStateBackend:

The default size limit for each individual state is 5MB, which can be increased by the constructor. The state size cannot exceed akka’s Framesize size (default: 10MB). The aggregated state must be able to fit into JobManager memory. That is, the total amount of state data received by JobManager cannot exceed the JobManager memory.

The following figure shows where MemoryStateBackend stores data:

MemoryStateBackend is suitable for:

  • Local development and debugging
  • Jobs with very small states

MemoryStateBackend can be specified by displaying it in code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend(DEFAULT_MAX_STATE_SIZE,false));
Copy the code

False in New MemoryStateBackend(DEFAULT_MAX_STATE_SIZE, false) indicates that the asynchronous snapshot mechanism is disabled.

MemoryStateBackend is clearly used for local debugging to record Job status information with very small states.

(2)FsStateBackend

FsStateBackend stores status data in TaskManager memory.

At CheckPoint, a status snapshot is written to the configured file system directory and a small amount of metadata is stored in the JobManager memory.

Using FsStateBackend need to specify a file path, generally is the path of the HDFS, for example, HDFS: / / the namenode: 40010 / flink/checkpoints.

Applicable scenarios for FsStateBackend:

  • Jobs with large state, long window, and large key (key or large value) states
  • Suitable for high availability solutions

The following figure shows where FsStateBackend stores data:

It is also possible to specify in code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints".false));
Copy the code

(3)RocksDBStateBackend

RocksDBStateBackend and there are some similar FsStateBackend, first of all, they all need an external file storage paths, such as the HDFS HDFS: / / the namenode: 40010 / flink/checkpoints, In addition, it is also suitable for large operations, large status, global high availability of those tasks.

However, unlike FsStateBackend, RocksDBStateBackend stores running status data in the RocksDB database. By default, RocksDBStateBackend stores data in the data directory of the node where TaskManager runs.

This means that RocksDBStateBackend can store far more state than FsStateBackend can, avoiding the OOM consequences of FsStateBackend’s state surge. However, by storing state data in the RocksDB database, Throughput will be reduced.

Also, it is important to note that RocksDBStateBackend is the only state backend that supports incremental snapshots.

The following figure shows where RocksDBStateBackend stores data:

RocksDBStateBackend applies to the following scenarios:

  • Large state, long window (days), large key state of the job

  • Suitable for high availability mode