When we do streaming calculations using Flink, we usually produce intermediate results of various forms, which we call State. If there is state, it will inevitably involve state storage. Then what forms of state storage are defined in Flink? Here is a brief introduction.
State Backends
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
MemoryStateBackend
As the name implies, the MemoryStateBackend state backend stores state data in the form of Object in the Java Heap.
When a checkpoint is executed, MemoryStateBackend generates a snapshot of the current state and sends the snapshot information as part of the checkpoint ACK message to the JobManager (Master node), which stores the received snapshot data in its heap memory.
By default, MemoryStateBackend calls snapshots to avoid data flow congestion. This is recommended. Of course, we can also configure to disable this approach.
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false); // MAX_MEM_STATE_SIZE Indicates the maximum allowed state capacity
Copy the code
Limitations on MemoryStateBackend
- The size of each state is limited to 5MB by default and can be set through the constructor
- No matter how you configure the maximum state size, it cannot exceed the akKA frame size
- The aggregate state size must match the memory size of JobManager
Based on these limitations, it is generally recommended to use MemoryStateBackend in the following scenarios:
- Local development debugging
- Stateless jobs or jobs that hold a small amount of state
In addition, the official recommendation is to set Managed Memory to 0 to ensure maximum Memory allocation for user programs on the JVM.
FsStateBackend
URL FsStateBackend need to configure a file system, such as: “the HDFS: / / the namenode: 40010 / flink/checkpoints” or “file:///data/flink/checkpoints”. FsStateBackend stores dynamic data during job execution in the memory of TaskManager. When a checkpoint is executed, status snapshot data is stored in the configured file system directory, and metadata data is stored in the memory of JobManager.
FsStateBackend also uses asynchronous snapshot by default. You can change the snapshot generation method by instantiating FsStateBackend.
new FsStateBackend(path, false);
Copy the code
FsStateBackend is recommended in the following scenarios:
- The job contains large state, long window, and large key state
- High availability application scenarios
It is also officially recommended that Managed Memory be set to 0 to ensure maximum Memory allocation for user programs on the JVM.
RocksDBStateBackend
RocksDBStateBackend also need to configure a file system URL: “HDFS: / / the namenode: 40010 / flink/checkpoints” or “file:///data/flink/checkpoints”.
RocksDBStateBackend stores dynamic data during job execution in the RocksDB database. The RocksDB database is stored in the TaskManager data directory by default. When a checkpoint is executed, the entire RocksDB database is archived to the configured file system directory. Only a small amount of metadata is stored in JobManager memory.
Similarly, RocksDBStateBackend often uses asynchronous snapshot as well.
Some limitations on use:
- Since RocksDB’s JNI Bridge API is based on byte[], the maximum key value size supported is 2^31 bytes. This limit is generally fine, but when the state in a job is generated based on continuous merge operations, it is easy to exceed this size limit, which can cause retrieval failures.
RocksDBStateBackend is recommended for the following scenarios:
- The job contains large state, long window, and large key state
- High availability application scenarios
At first glance, it looks like FsStateBackend? No, it is important to note that when we use RocksDBStateBackend as the state store, the amount of state that can be maintained is only limited by the amount of disk space that can be accessed by the program. This allows us to maintain much larger job states than FsStateBackend.
Of course, there is a problem with this approach: since all read and write operations to and from the state back end go through de-/serialization, some throughput is sacrificed.
conclusion
- MemoryStateBackend and FsStateBackend are heap-based state storage
- RocksDBStateBackend is currently the only state backend that supports incremental checkpoint
Flink State Backends
Welcome to follow my wechat official account: gao Gaomu. The first time to read the latest experience sharing, communication and growth together.