In this article, we introduce that Flink state management is based on the local, and Flink is a distributed engine deployed in the multi-node, distributed system often occurs process killing, node downtime or network interruption and other problems, so how to ensure that the local state is not lost in the event of failure? Flink periodically saves status data to storage and recovers from previous backups when a failure occurs. The whole thing is called Checkpoint mechanism, which provides Flink with Exactly Once delivery guarantee. This article introduces the principle of Flink’s Checkpoint mechanism. This article uses several concepts: Snapshot, Distributed Snapshot, Checkpoint, etc. All of these concepts refer to Flink’s Checkpoint mechanism, and readers can treat these concepts equally.

Flink Distributed snapshot process

Let’s start with a simple Checkpoint process:

  1. Pause processing of incoming data and cache the new data.
  2. Copies the local state data of the operator subtask to a remote persistent store.
  3. Continue processing incoming data, including the data just cached.

Flink is a distributed snapshot algorithm based on Chandy-Lamport algorithm [1]. Before we get into the details of Flink’s snapshot process, let’s take a look at the concept of Checkpoint barriers. As shown in the figure below, a Checkpoint Barrier is inserted into the data stream, which splits the data stream into segments. The Checkpoint logic of Flink is that the status changes due to a new data inflow. Flink’s operator takes a snapshot of the status after receiving the Checpoint Barrier. Each Checkpoint Barrier has an ID that identifies the Checkpoint to which the data belongs. As shown in The figure, when Checkpoint Barrier N reaches each operator, a snapshot is created for the status updates between n and N. A Checkpoint Barrier is a bit like a Watermark in Event Time in that it is inserted into a stream without affecting the original processing order of the stream.

Next, we build a parallel data flow diagram to demonstrate Flink’s distributed snapshot mechanism. The data flow diagram has two Source subtasks, and the data flow flows from Source to Sink on these parallel operators.

First, Flink’s Checkpoint Coordinator triggers a Checkpoint, and the request is sent to the various sub-tasks of the Source.

After receiving the Checkpoint request, each sub-task of the Source operator writes its status to the status back end, generates a snapshot, and broadcasts a Checkpoint Barrier to the downstream.

The Source operator also sends an acknowledgement to the Checkpoint Coodinator after the snapshot is completed. This validation includes some metadata, including the State handle, or pointer to the State, that was backed up to State Backend. At this point, the Source completes a Checkpoint. Like Watermark, an operator subtask sends a Checkpoint Barrier to all downstream operator subtasks it connects to.

For downstream operators, there may be multiple upstream inputs connected to it, and we call the edges between operators channels. The Source broadcasts a Checkpoint Barrier with ID N to all downstream operators, which means that the downstream operators have the same Checkpoint Barrier in multiple inputs, and the Checkpoint Barrier inflow rate may be different in different inputs. Barrier Alignment is required during Checkpoint Barrier propagation. A small portion of the data flow diagram is selected to analyze how Checkpoint barriers propagate and align among operators.

As shown in the figure above, alignment is divided into four steps:

  1. The operator subtask receives the first Checkpoint Barrier with ID N in one input channel, but the Checkpoint Barrier with ID N in the other input channels has not arrived. The operator subtask starts to align itself.
  2. The operator subtask caches data from the first input channel while continuing to process data from other input channels, a process known as alignment.
  3. The Checkpoint Barrier of the second input channel reaches the sub-task of the operator. The sub-task takes snapshots, writes the status to State Backend, and broadcasts the Checkpoint Barrier with ID N to all downstream output channels.
  4. For this operator subtask, the snapshot completes and the new incoming data continues to be processed on each channel, including the data that was just cached.

Each sub-task of the Sink operator in the data flow diagram needs to complete the alignment, snapshot, and confirmation. After all the Sink operators have confirmed the snapshot, Checkpoint N is executed. Checkpoint coordinators write metadata of the Checkpoint to State Backend.

The main reason for the alignment is to ensure that the state of all operators in a Flink job is consistent. In other words, after a Checkpoint Barrier (N) flows into all the operator subtasks from front to back, all the operator subtasks can write the same data to the snapshot.

Snapshot performance optimization solution

This method ensures data consistency, but has some potential problems:

  1. Each Checkpoint stops processing incoming data and starts snapshot execution. If the status is large, a snapshot may take several seconds or even minutes.
  2. When Checkpoint barriers are aligned, they must wait for all upstream channels to complete processing. If one upstream channel processes slowly, the entire data flow may be blocked.

Flink already has some solutions to these problems and is constantly optimizing them.

For the first problem, Flink provides an Asynchronous Snapshot mechanism. When a snapshot is executed, Flink immediately broadcasts a Checkpoint Barrier to indicate that it has completed its snapshot. At the same time, Flink starts a background thread to create a copy of the local status. The thread synchronizes the copy of the local status to State Backend and sends confirmation information to the Checkpoint Coordinator once the synchronization is complete. Copying a Copy of data takes up more memory, so you can use copy-on-write optimization. Copy-on-write Indicates that if the memory data is not modified, a Copy is not needed. A pointer to the data is required and local data is synchronized to State Backend. If this memory data has some updates, then apply for additional memory space and maintain two copies of data, one is the snapshot data, one is the updated data.

For the second problem, Flink allows the alignment step to be skipped, or an operator subtask to broadcast Checkpoint barriers, perform snapshots, and continue processing subsequent incoming data without waiting for all upstream Checkpoint barriers. To ensure data consistency, Flink must also take snapshots of elements in the slower data stream, which will be reprocessed once restarted.

State Backend

The snapshot mechanism of Flink has been shared previously. State Backend provides an important function for persistent data storage. Flink abstracts State Backend into a plug-in and provides three types of State Backend, each of which saves and recovers data in slightly different ways. Let’s take a closer look at Flink’s State Backend.

MemoryStateBackend

As you can see from the name, this State Backend is primarily memory based and stores data in the Java heap. When a distributed snapshot is taken, all operator subtasks synchronize the state of their own memory to the JobManager heap. All states of a job must be smaller than the size of JobManager’s memory. This approach obviously does not store too much state data, or OutOfMemoryError will be thrown. Therefore, this method is only suitable for debugging or experiments and is not recommended for production environments. The following code tells a Flink job to use memory as State Backend and specifies the maximum value of State in the parameter, which by default is 5MB.

env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE))
Copy the code

If no configuration is done, the default is to use memory as State Backend.

FsStateBackend

In this mode, data is persisted to file systems, including local disks, hadoop distributed File System (HDFS), and cloud storage services such as Amazon and Ali Cloud. To use the file system address, specify the prefix, such as file://, HDFS ://, or s3://. Additionally, this approach supports Asynchronous Snapshot, which is enabled by default to speed up data synchronization.

// HDFS is used as State Backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"))

// Use Ali Cloud OSS as State Backend
env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>"))

// Use Amazon as State Backend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"))

// Close Asynchronous Snapshot
env.setStateBackend(new FsStateBackend(checkpointPath, false))
Copy the code

The local state of Flink remains on the TaskManager heap until the snapshot is written to the configured file system. Therefore, this approach can enjoy fast read and write access to local memory, and also ensure the failure recovery capability of high-volume state jobs.

RocksDBStateBackend

In this way, the local state is stored on the local RocksDB. RocksDB is an embedded key-value database where the data is actually stored on a local disk. RocksDB uses disk space to store more local state than FsStateBackend does in memory. However, each time reading or writing data from RocksDB requires serialization and deserialization, so reading or writing local state is more expensive. During snapshot execution, Flink synchronizes the status stored in the local RocksDB to the remote storage. Therefore, when using State Backend, you also need to configure the address of distributed storage. Asynchronous Snapshot is also enabled by default.

In addition, State Backend allows Incremental Checkpoint. The core of Incremental Checkpoint is that only the changed data is written to the distributed storage during each snapshot, rather than copying all the local status. Incremental Checkpoint is ideal for very large states, where the time taken to take snapshots is significantly reduced, at the expense of longer restart and recovery times. By default, Incremental Checkpoint is disabled and needs to be enabled manually.

// Enable Incremental Checkpoint
val enableIncrementalCheckpointing = true
env.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing))
Copy the code

RocksDBStateBackend can support more local and remote states than FsStateBackend, and the Flink community already has TB cases.

In addition to the above three, developers can also develop their own implementations of State Backend.

Restart recovery Process

Flink’s restart recovery logic is relatively simple:

  1. Restart the application and redeploy the data flow graph on the cluster.
  2. Read the last Checkpoint data from the persistent store and load it to an operator subtask.
  3. Continue processing incoming data.

This mechanism ensures Excatly-Once consistency of the internal state of Flink. As for end-to-end exact-once consistency, it depends on the specific implementation of Source and Sink. When a fault occurs, some data may have entered the system but has not been Checkpoint. The Checkpoint of the Source records the input Offset. When Flink restarts, Flink restores the latest Checkpoint to the memory and causes the Source to send the data again based on the Offset to ensure that the data is not lost or duplicated. While message queues like Kafka provide retransmission, socketTextStream does not, which means that Exactly Once delivery is not guaranteed.

Checkpoint Configuration

By default, Checkpoint is turned off and must be turned on by calling env.EnablecheckPointing (n) to Checkpoint every n milliseconds. A Checkpoint is a heavy task. If the status of a Checkpoint is large and the value of N is small, the next Checkpoint may be triggered before the completion of one Checkpoint, occupying too many resources that should be used for normal data processing. An increase in n means that a job has fewer Checkpoint times and fewer resources for Checkpoint processing. Therefore, more resources can be used for normal flow data processing. Also, a larger n value means that after a restart, the entire job needs to reprocess data from a longer Offset.

There are also several other parameters that need to be configured, encapsulated in CheckpointConfig:

val cpConfig: CheckpointConfig = env.getCheckpointConfig
Copy the code

The default Checkpoint configuration supports Exactly-Once delivery. This ensures that any data is processed only Once by the status of all operators during restart and recovery. Using Exactly-Once is a Checkpoint Barrier alignment, so there is a delay. If the job latency is small, then at-least-once delivery should be used with no alignment, but some data will be processed more than Once.

/ / use the At - further - Once
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
Copy the code

If a Checkpoint is not completed after a certain period of time, the Checkpoint is terminated to prevent it from occupying too many resources:

// The timeout period is 1 hour
env.getCheckpointConfig.setCheckpointTimeout(3600*1000)
Copy the code

If the interval between checkpoints is too short, the normal operation may acquire fewer resources and spend more resources at the Checkpoint. Properly configuring this parameter ensures normal data flow processing. For example, if you set this parameter to 60 seconds, no new Checkpoint will be started for 60 seconds after the previous Checkpoint ends. This mode applies only when a job is allowed at most one Checkpoint.

// The interval between two checkpoints is 60 seconds
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)
Copy the code

By default, only one Checkpoint is allowed to execute a job. If a Checkpoint is in progress and another Checkpoint is started, the new Checkpoint needs to be suspended.

// Perform a maximum of three checkpoints at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)
Copy the code

If this parameter is greater than 1, it will conflict with the minimum interval mentioned earlier.

The original purpose of Checkpoint is to perform fault recovery. If a job fails due to an exception, Flink saves data on remote storage. If the developer cancels the job himself, the data on the remote store is deleted. If the developer wants to debug using Checkpoint data and cancels the job, and wants to save the remote data, set this parameter to:

// Save Checkpoint after the job is canceled
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
Copy the code

In RETAIN_ON_CANCELLATION mode, users need to manually delete Checkpoint data from the remote storage.

By default, a Checkpoint failure causes the entire application to restart. You can disable this function so that the Checkpoint failure does not affect job running.

env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
Copy the code

  1. Leslie Lamport, K. Mani Chandy: Distributed Snapshots: Determining Global States of a Distributed System. In: ACM Transactions on Computer Systems 3. Nr. 1, Februar 1985. ↩︎