In Flink, the State is called State, which is used to store intermediate results or some cached data. For many DataStream operators in Flink, they need to rely on certain intermediate results, namely State, for calculation. Such as de-redo operation, CEP check operation, Exactly Once and so on. So state is an important part of Flink processing system.
1. Types of status data structures
The state inheritance relationship is complex and has many levels, but the following data structure states are mainly used:
-
ValueState
the single-valued state of type T, which can be obtained by the value() method and updated by the update method
-
ListState
List of elements of type T
-
ReducingState
Use the reducingState.add (value:T) method to immediately return a value aggregated using ReduceFunction, which can be obtained via reducingState.get ()
-
MapState
holds a set of key-to-value mappings. This state provides many Java Map-like methods
-
AggregatingState
,out>
and ReducingState have similar behaviors, and AggregateFunction is relatively more general
-
Internal****State
This State is mainly used to access status data within the system, and is generally not used by users
-
BroadcastState
Is used to store the state data in the BroadcastStream. The data in the BroadcastState is sent to all instances of the specified operator, and the data in each instance is the same
2. KeyedState
和 OperatorState
DataStream data sets can be divided into KeyedState and OperatorState based on whether they are grouped by Key.
KeyedState
:KeyedStrem
Stream each one aboveKey
Each corresponds to a State object.OperatorState
:OperatorState
And the parallel operator instance, there’s only one for the whole operatorState
.
State | KeyedState | OperatorState |
---|---|---|
Scope of application | KeyedStream operator | For all types of operators |
State the number of | Each Key on the KeyedStrem stream corresponds to a State object | The whole operator corresponds to an operator state |
access | Rewrite RichFunction | Implement CheckpointedFunction or ListCheckpointed interface |
State data structure support | ValueState,ListState,ReducingState,AggregatingState,MapState | ListState,BraoadcastState |
3. StateBackend
– A place to store and maintain state
StateBackend has the following functions
stateBackend
The ability to createstate
, provides an interface to read data- When the system goes
checkpoint
Time,stateBackend
Store state persistently.
3.1 KeyStatedBackend
The implementation of the
The picture below shows that KeyStatedBackend implements KeyedStateBackend KeyedStateBackend inherited PriorityQueueSetFactory KeyStateFactory. So this class have PriorityQueueQueueElement priority queue, create InternalKvState method. This class also has the snapshot method to take snapshots of state data in KeyedStateBackend. Meanwhile, HeapKeyedStateBackend and RocksDBKeyedStateBackend are two basic implementation classes of KeyedStateBackend. RocksDBKeyedStateBackend is a separate JAR package that must be imported separately. Without this JAR package, you cannot see the inheritance relationship for RocksDBKeyedStateBackend
3.1.2 HeapKeyedStateBackend
The introduction of
HeapStateBackend is based on JVM heap memory storage and is the KeyedStateBackend supported by Flink by default.
- HeapKeyedStateBackend State type
Since HeapKeyedStateBackend is a state backend that stores state types, HeapKeyedStateBackend’s state types mainly start with Heap, for example, HeapAggregatingState. HeapListState and HeapReducingState. This state type must inherit the AbstractHeapState class
- Design and implementation of StateTable
- in
HeapKeyedStateBackend
Mainly throughStateTable
To store data structures to store state data. inAbstractHeapState
Defined in theStateTable
In the instantiationAbstractHeapState
Will define thisStateTable
The implementation class of the interface. - in
HeapKeyedStateBackend
In, mainly throughMap<String, StateTable<K, ? ,? >>
registeredKVStates
To store it,key
Register the name of the state of concentration for us,value
Is used to store stateStateTable
The data.
private final Map<String, StateTable<K, ? ,? >> registeredKVStates;Copy the code
-
StateTable has CopyOnWriteStateTable and NestedMapsStateTable. By default, CopyOnWriteStateTable is generally used. CopyOnWriteStateTable data structure is used to support asynchronous snapshot operations during checkpoint. CopyonWriteMap data structure is used to store data. The underlying NestedMapsStateTable uses the NestedStateMap data structure to store data elements. The checkpoint procedure supports snapshot synchronization
-
CopyOnWriteStateTable sacrifices performance and memory storage for asynchronous snapshots and incremental rehashing.
-
CopyOnWriteStateTable differs from ordinary hashTable in that it uses two hashkeys, K Key and N namespace, to determine a value without using a nested structure. The main data structure for storage is StateMapEntry
,>
[] primaryTable, a list of statemapEntries
3.2 Implementation of OperatorStateBackend
OperatorState
Can be achieved byOperatorStateBackend
As well asRawKeyedStateInput
There are two types of state management backend creation.OperatorStateBackend
isFlink
The default managed operator state of the management backend, managed state refers to byFlink
frame-managedState
, such asValueState
.ListState
.MapState
. Uselessness is managed by user awareness. The raw state, user-defined state, needs to be managed and used by the developers themselvesbyte
Arrays to read and write state content.
As shown in the following figure, OperatorStateBackend implements four interfaces. The OperatorStateStore interface provides methods to get BroadcastState, ListState, and StateNames registered in OperatorStateStore. Only DefaultOperatorStateBackend OperatorStatebackend a default implementation class. Also, the state data of all operators can only be stored in the MEMORY of the JVM.
Overall design of StateBackend
StateBackend is an interface that defines how to store and checkpoint status of streaming applications. Different StateBackend stores data in different ways. As shown in the following figure, the main implementation classes of StateBackend are MemoryStateBackend, FsStateBackend, and RocksDBackend. The StateBackend interface contains createKeyedStateBackend(), CeateOperarotStateBackend () method, in order to get the above KeyedStateBackend and OperatorStateBackend. The main difference between the three status backend is that the purchaser’s JetedStateBackend and checkpointStorage are different in north China and North China
name | MemoryStateBackend | FsStateBackend | RocksDBackend |
---|---|---|---|
KeyedStatebackend | HeapKeyedStateBackend | HeapKeyedStateBackend | RocksDBKeyedStateBackend |
Checkpoint data | JobManager heap memory | Checkpoint data is stored in a specified file | Embedded local database RocksDB |
Usage scenarios | Local development test debugging | Available and production line environment, suitable for high availability solutions | Compared with FSstateBackend, RocksDB’s LSM-Tree memory data structure stores a larger amount of state data. The back end of the only currently supported incremental checkpoint |
Pay attention to the point | Is limited by the size of the JobManger’s memory. Each State defaults to 5MB and can be adjusted via constructors. Each State cannot exceed the AkkaFrame size |
State data is stored in TaskManager and cannot exceed TaskManager memory. TaskManager asynchronously writes data stores to external storage |
The total State size is limited by disk size, not memory. RocksDB needs to set the external system file save State. RocksDB’s JNI API is based on bytes, and a single key and value cannot exceed 2 to the power of 31 |
4. References
Flink design and implementation of the core principle and source code analysis