checkpoint & savepoint
What:
- Snapshot: A global copy of the state of the data at one point in time. Flink refers to a globally consistent mirror.
- Checkpoint: Snapshots automatically performed by Flink for fault recovery. This parameter is rarely used by a user
- Savepoint: A snapshot triggered manually (or by AN API call) by a user for some operational purpose, such as stateful redeployment/upgrade/scaling operations. Savepoints are always complete and have been optimized for operational flexibility.
why:
Ensure application fault tolerance
1. Implement flink Job restart to reduce the restart cost
2. One of the conditions for implementing exactly-once (each event affects flink state only once) (two-stage commit + source exactly once + sink exactly once)
how:
Flink uses asynchronous Barrier snapshots to persist the global state of jobs.
1, the barrier
A barrier is generated by a jobManager (checkpoint Coordinator) and injected into a data flow. A barrier can be regarded as a special flow event element. It divides the flow data into old data and new data.
2, asynchronous
Asynchronous snapshot generation based on copy-on-write
** Copy on write: If the in-memory data is not modified, a copy is not necessary. A pointer to the data is used to synchronize local data to State Backend. If this memory data has some updates, then apply for additional memory space and maintain two copies of data, one is the snapshot data, one is the updated data.
Snapshot Generation Process
A checkpoint Coordinator sends a checkpoint command and injects the Barrier to the data flow source. In this case, sources records the current consumption of the kafka partition offset (notify the checkpoint coordinator after the consumption is complete).
2. The barrier is then transmitted to the downstream. When the non-source and non-sink operators receive the barrier, the non-source and non-sink operators asynchronously save the operator status and notify the checkpoint coordinator to broadcast the status to the downstream operators.
3. When Sink receives the operator from upstream, there are two processing modes according to the fault-tolerant guarantee semantics
1. In the engine, the exact-once: sink operator saves the state snapshot and notifts the checkpoint coordinator. When the coordinator receives the message that all operator snapshots are completed, it sends the snapshot completion message to all operators.
End-to-end exactly-once: The sink operator saves the state snapshot, pre-commits the transaction, and then notifits checkpoint coordinator. When the coordinator receives the message that all operator snapshots are completed, it sends the message to all operators, and sink submits the transaction.
Barrier alignment. For example, when upstream BARRIER A arrives first, the current operator will cache subsequent upstream data until upstream barrier B arrives and perform a snapshot. In this case, data processing and checkpoint are blocked
Unaligned checkpoints are introduced in 1.11 and later, and are disabled by default. When a barrier reaches the upstream stream, the checkpoint is performed immediately and the records of subsequent snapshots in other streams are asynchronously stored.
The stored file consists of two parts:
Metadata files and data files, data file storage state mirrors, metadata file storage holds Pointers to data files
Checkpoint file format (FsStateBackend):
Savepoint file format:
Difference:
1. Trigger timing is different
* Checkpoint is triggered automatically by flink * Savepoint is used for manual triggeringCopy the code
2. Different life cycle management methods
- The checkpoint life cycle is created, managed, and deleted by Flink itself. By default, the checkpoint life cycle is deleted after the job ends
- Savepoint is manually created and deleted by the user, so it can continue to exist after the job stops
3. Different use scenarios
- Checkpoint is used for fault recovery (if there is no checkpoint after savePoint, Flink reads savePoint for recovery)
- Savepoint is used to adjust user logic, AB experiment, parallelism modification, version upgrade, etc
4. Realize the difference
Current implementations of Checkpoint and Savepoint use essentially the same code and generate the same format. However, there is currently an exception and we may introduce more differences in the future. The exception is to use an incremental Checkpoint on the RocksDB state back end. They used some of the RocksDB internal formats instead of Flink’s native Savepoint format. This makes them the first instance of a more lightweight Checkpoint mechanism compared to Savepoint.
configuration
checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointStorage("file:///..." ); Maximum concurrent checkpoint: env1 getCheckpointConfig () getMaxConcurrentCheckpoints () the default minimum 1 time interval: Env1. GetCheckpointConfig (.) getMinPauseBetweenCheckpoints default is 0 () : This value also means that the number of concurrent checkpoints is one, Bin /flink run -s :checkpointMetaDataPath [:runArgs]Copy the code
savepoint
Dir trigger bin/flink savePoint :jobId [:targetDirectory] bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId # savePoint and cancel bin/flink cancel -s [:targetDirectory] :jobId # savePoint restore bin/flink run -s :savepointPath [:runArgs] -n: AllowNonRestoredState Allows skipping the delete operator # delete savePoint bin/flink savePoint -d :savepointPathCopy the code
www.ververica.com/blog/differ…
- Objective: Conceptually, Flink’s Savepoints are different from Checkpoints in a similar way that backups are different from recovery logs in Traditional Database Systems. Checkpoints’ primary objective is to act as a recovery mechanism in Apache Flink PLC a fault-tolerant processing framework that can recover from potential job failures. Conversely, Savepoints’ primary goal is to act as the way to restart, continue or reopen a paused application after a manual backup and resume activity by the user.
- Implementation: Checkpoints and Savepoints differ in their implementation. Checkpoints are designed to be lightweight and fast. They Might (but don’t necessarily have to) make use of different features of the underlying state backend and try to restore it data as fast as possible As an example, Incremental detection with the RocksDB State backend use RocksDB’s internal format instead of Flink’s native format. This is used to speed up the checkpointing process of RocksDB that makes them the first instance of a more lightweight Checkpointing mechanism. On the contrary, Savepoints are designed to focus more on the portability of data and support any changes made to the job that make them slightly more expensive to produce and restore.
- Lifecycle: Checkpoints are automatic and periodic in nature. They are owned, created and dropped automatically and periodically by Flink, without any user interaction, to ensure full recovery in case of an unexpected job failure. On the contrary, Savepoints are owned and managed (i.e. they are scheduled, created, and deleted) manually by the user.
\