Author | Liao Jiayi
Abstract: This paper introduces two main features of Bytedance in the past period of time, one is the function of single point recovery at the Network layer, the other is the Regional Checkpoint at the Checkpoint layer. The contents include:
- Single point recovery mechanism
- Regional Checkpoint
- Other optimizations at Checkpoint
- Challenges & Future planning
Author’s original video sharing: www.bilibili.com/video/BV13a…
1. Single point recovery mechanism
In the real-time recommendation scenario of Bytedance, we used Flink to splice user characteristics and user behaviors in real time, and spliced samples as the input of the real-time model. The delay and stability of the splicing service directly affect the recommendation effect of online products for users. In Flink, this splicing service is an implementation similar to dual-stream Join. The failure of any Task or node in a Job will lead to Failover of the whole Job. The real-time recommendation effect of corresponding services is affected.
Before introducing single point recovery, let’s review Flink’s Failover strategy.
- Individual – Failover:
This mode is applicable to the scenario where no Task is connected and the application scenarios are limited.
- Region-Failover:
This policy divides all tasks in a job into several regions. When a Task is faulty, it tries to find the minimum Region set that needs to be restarted for fault recovery. Compared with the global restart fault recovery policy, this policy requires fewer tasks to be restarted in some scenarios.
If the region-failover policy is used, a Job is a large Region because it is a fully connected topology. Restarting the region is equivalent to restarting the entire Job. Therefore, we consider whether to use the Flink individual-task-failover policy to replace the region-failover policy. The individual-task-failover policy is not applicable in this topology. Therefore, we need to design and develop a new Failover strategy for the following scenarios:
- Multi-stream Join topology
- Large flow (30M QPS), high concurrency (16K*16K)
- Allows a small amount of partial data loss in a short period of time
- High requirement for continuous output of data
Before going into the technical solution, take a look at Flink’s existing data transfer mechanism.
From left to right (SubTaskA) :
- The RecordWriter receives the incoming data first
- RecordWriter Shuffles the data to select a channel based on the data information, such as key
- Load data into buffer and place it in the buffer queue corresponding to the channel
- The packets are sent downstream through the Netty Server
- The downstream Netty Client receives data
- According to the partition information in the buffer, it is forwarded to the corresponding downstream channel
- The InputProcessor retrieves data from the buffer and performs operator logic
According to the above ideas, we need to solve the following problems:
- How do upstream tasks sense downstream failures
- If a downstream Task fails, how can I make the upstream Task send data to a normal Task
- If the upstream Task fails, how can the downstream Task continue to consume data in the buffer
- How to handle incomplete data in upstream and downstream
- How do I make a new connection
Propose solutions to the above problems.
■ How do upstream tasks sense downstream failures
The downstream SubTask actively sends the failure message to the upstream, or the upstream Netty Server can sense that the TM is shut down. An X in the figure represents an unavailable SubPartition.
First, make SubPartition1 and the corresponding View (a structure used by Netty Server to fetch SubPartition data) unavailable.
Later, when the Record Writer receives new data, it needs to send data to SubPartition1. At this time, the Record Writer needs to check the availability. When the SubPartition is available, the Record Writer sends data normally.
■ The upstream Task receives the new connection from the downstream Task
After the downstream subTask is rescheduled and started, the upstream Netty Server sends a Partition Request to the downstream subTask. After receiving the Partition Request, the upstream Netty Server creates another View for the downstream subTask. In this case, the upstream Record Writer can write data normally.
■ The downstream Task senses that the upstream Task fails
Similarly, a downstream Netty Client can sense that an upstream subTask has failed, find the corresponding channel, and insert an unavailable event at the end (denoted by an exclamation mark). The goal is to lose as little data as possible, so the buffer in the channel can be consumed by the InputProcessor until an “unavailable event” is read. The channel unavailable flag and the corresponding buffer queue are cleaned.
■ Incomplete data in Buffer
The first thing to know is where the incomplete data is stored. It resides inside the input process, which maintains a small buffer queue for each channel. When a buffer is received, it is incomplete data. After receiving the next buffer, it is spliced into a complete data and sent to the operator.
■ Reconnect downstream tasks to upstream tasks
When an upstream Task with a problem is rescheduled, the TaskManager API is called to notify the downstream. After receiving the notification, the downstream Shuffle Environment checks the status of the corresponding channel. If the status of the corresponding channel is unavailable, the downstream Shuffle Environment generates a new channel and releases the old one. If yes, it indicates that the buffer of the channel is not consumed. You need to wait until the buffer is consumed before replacing it.
The business income
In the figure above, a comparison test is made for a job with 4000 degrees of parallelism. The business is to Join a user presentation stream with a user behavior stream, and the whole job has 12000 tasks.
In the preceding figure, ** single point recovery (reserved resources) ** is a feature made by the scheduling group. When applying for resources, you need to apply for more resources. When failover occurs, the time spent applying for resources from YARN is reduced.
Finally, the output of the job was reduced by 1/1000, and the recovery time was about 5 seconds. Because the whole recovery process is short, it can basically achieve no downstream perception.
Second, the Regional Checkpoint
In a classic data integration scenario, data import and export. For example, importing from Kafka to Hive meets the following characteristics.
- No all-to-all connection exists in the topology. Procedure
- Strongly rely on Checkpoint to realize data output under Exactly-Once semantics
- The Checkpoint interval is long, and the success rate is high
In this case, there is no shuffle of the data.
What are the problems encountered in the data integration scenario?
- A Task Checkpoint failure affects the global Checkpoint output
- Network jitter, write timeout/failure, and storage environment jitter greatly affect jobs
- The success rate of more than 2000 parallel operations declined significantly, lower than business expectations
In this scenario, we consider that the job topology will be divided into multiple regions based on the region-failover policy. Can Checkpoint take a similar approach and manage Checkpoint as a region unit? The answer is yes.
In this case, you do not need to wait until all Task checkpoint tasks are completed before performing partition archiving (for example, rename an HDFS file). After a region completes, checkpoint archiving can be performed at the region level.
Before introducing the solution, we will briefly review Flink’s existing checkpoint mechanism. I believe you are familiar with it.
Existing CKP
The diagram above shows a topology of Kafka source and Hive sink operator with a parallelism of 4.
The checkpoint coordinator triggers the triggerCheckpoint operation and sends it to each source task. After the Task receives the request, the Task operator is triggered to perform the snapshot operation. There are eight operator states in the example.
Existing ckp1
After each operator completes the snapshot, the Task sends an ACK message to the checkpoint coordinator to indicate that the Task has checkpoint completed.
When a Coordinator receives an ACK message indicating that all tasks succeeded, CheckPont considers the Task successful. Finally, trigger the Finalize operation to save the corresponding metadata. Notify all Task checkpoint completion.
What problems do we encounter when using Region to manage checkpoint?
- How to divide Checkpoint regions
Divide tasks that are not connected into one region. Obviously there are four regions in this example.
- What is the Checkpoint result of a failed Region
If the first checkpoint is successfully completed, the status of each operator is successfully written to the HDFS checkpoint1 directory, and eight operators are mapped to four checkpoint regions through logical mapping. Note that this is only a logical mapping and does not make any moves or changes to the physical file.
Existing ckp1
The second checkpoint of region-4-data (kafka-4, hive-4) failed. Procedure The checkpoint2 (job/chk_2) directory does not contain kafka-4-state and hive-4-state files. The current checkpoint2 is incomplete. To ensure integrity, search for the successful state file of region-4-data from the last or previous successful checkpoint file and perform a logical mapping. The current checkpoint region status file is complete, and the checkpoint is complete.
In this case, if most or all of the regions fail, and if the previous checkpoint is referenced, the current checkpoint is no longer the same as the previous checkpoint.
You can set the maximum failure ratio of a region, for example, 50%. In this example, there are four regions. A maximum of two regions can be failed.
- How can I avoid storing too much Checkpoint historical data on the file system
If a region keeps failing (with dirty data or code logic problems), the current mechanism causes all historical checkpoint files to be kept, which is obviously not reasonable.
This section describes how to configure the maximum number of consecutive region failures. For example, 2 indicates that region can reference at most the region results of the previous two checkpoint successes.
Difficulties in engineering Implementation
- How to handle Task Fail and checkpoint timeout
- How do I handle the status of subtasks that have been successfully snapshot in the same region
- How can I ensure compatibility with checkpoint Coordinators
So let’s see what Flink has done so far.
Existing coordinator
When Task failure occurs, JobMaster FailoverStrategy is notified first. The FailoverStrategy is used to notify the checkpoint coordinator to perform checkpoint cancel.
What about checkpoint timeout? When a coordinator triggers a checkpoint, checkpoint Canceller is enabled. Canceller Has a timer. If a timeout occurs and a coordinator has not checked, the coordinator is notified to cancel the checkpoint.
Either Task Fail or timeout points to a pendding checkpoint, and the checkpoint currently pointed to is discarded.
Check checkpoint messages and checkpoint coordinators’ responses before modifying checkpoint information.
Global checkpoint is Flink’s existing mechanism.
To ensure compatibility with checkpoint Coordinators, add a CheckpointHandle interface. Two implementations, GlobalCheckpointHandle and RegionalCheckpointHandle, are added to implement global checkpoint and Region Checkpoint operations by filtering messages.
Region checkpoint If the handler receives a failure message, it sets the region to a failure and attempts to map the region logic from the previous successful checkpoint. Similarly, nofityComplate messages sent by a coordinator are filtered by the handler and the messages sent to the failed Task are filtered out.
The business income
The test at 5000 degree of parallelism assumes a 99.99% success rate for a single Task Snapshot. Global Checkpoint has a 60.65% success rate, while Region Checkpoint still has a 99.99% success rate.
Other optimizations on Checkpoint
■ Restore the operator state in parallel
Union state is a special state. During recovery, all Task states of a job need to be found and then the union state is restored to a single Task. If the parallelism of a Job is very large, such as 10000, then the union state of each task needs to read at least 10000 files for recovery. If the state of the 10000 files is serial restored, then the recovery time can be expected to be very long.
Although the data structure corresponding to OperatorState cannot be operated in parallel, the process of reading files can be parallelized. During the recovery of OperatorStateBackend, the process of reading HDFS files can be parallelized. After all state files are parsed into memory, With a single thread, we can reduce the state recovery time from tens of minutes to a few minutes.
■ Improved CheckpointScheduler and Checkpoint trigger
Flink checkpoint interval and timeout cannot be changed after a task is submitted. However, it can only be set according to the experience value when it first goes online. However, it is often found that the parameters such as interval and timeout are set improperly during peak hours. In this case, one method is to modify the parameters to restart the task, which has a great impact on services. This method is obviously unreasonable.
In this work, we reconstructed the Checkpoint triggering mechanism in CheckpointCoordinator, abstracting the existing Checkpoint triggering process so that we can quickly customize the Checkpoint triggering mechanism based on abstract classes. For example, in scenarios that support data import, we implement an hour-trigger mechanism to create Hive partitions faster, facilitating downstream data viewing as soon as possible.
There are many optimization points that I don’t want to list.
Iv. Challenges & Future planning
Currently, the operation status of bytes can reach a maximum of 200TB. However, RocksDB StateBackend cannot support such operations with large traffic and status. Therefore, in the future, we will continue to do more work on state and checkpoint performance optimization and stability, such as strengthening the existing StateBackend, solving the rate problems of skew and checkpoint downsizing, and enhancing debugging capabilities.