Abstract: This article is based on a speech delivered by Meituan data platform engineers Feng Fei and Wang Feifan at Flink Forward Asia 2021. The main contents include:
- Relevant background
- Large job deployment optimization
- Checkpoint Cross-machine room copy
- State stability related optimization
- The future planning
FFA 2021 Live Playback & Presentation PDF download
I. Relevant background
The application scenarios of Meituan Flink cover three scenarios defined by the community:
- Data pipeline scenarios are widely used, such as real-time access of data at ODS layer of data warehouse or real-time data synchronization across data sources.
- Typical application scenario is data analysis, such as the construction and application of real-time data warehouse, business will produce some real-time reports and market to assist business decision-making, or calculate some real-time features to serve business production;
- In event-driven scenarios, it is mainly applied to security risk control and system monitoring alarms.
With the development of business and the iteration of real-time computing, the application of real-time computing is more and more extensive and in-depth. We currently have nearly 50,000 jobs deployed on more than 15,000 machines, and peak throughput is 540 million beats per second, which is a significant increase from previous years. In addition to the overall scale growth, this year we also encountered a large increase in the scale of single operations. At present, the concurrency of our large operations has reached 5000, and the status has reached 10 TB. The growth of single operations also brings us new problems and challenges.
The first is the problem of job startup. Problems that are not obvious in small-scale jobs are exposed in the deployment and startup process of large jobs, such as slow Task startup and deployment, uneven distribution, and the impact of large jobs on HDFS. In addition, when the status is very large, it also brings a non-negligible impact on the operation, mainly including the production cost and recovery efficiency of Savepoint, as well as the disaster recovery of the status and other problems.
Second, large operation deployment optimization
Our current large operation operator has been developed to 5000, the number of operators in topology complexity has reached 8000, there are two layers of data shuffle exchange, and the resource operation requires more than 1000 TaskManagers. Under such a scale, we have encountered some problems that we have not encountered before.
-
First, the deployment of a large number of tasks may take a long time or fail due to RPC timeout.
-
In addition, the distribution of tasks is not reasonable. The number of Network buffers in some TaskManagers is insufficient, which may cause job startup failure.
-
In addition, the HDFS is temporarily stressed during Checkpoint operation of large jobs, which affects the HDFS usage of other jobs.
To solve the above problems, we made the following optimization:
We first analyzed the job deployment process from the Perspective of JobManager, hoping to understand what factors in each step affect deployment and how they affect deployment, and then to address the problem appropriately.
As you can see, from receiving JobGraph to starting all tasks, the main steps are constructing the execution graph, applying for resources, deploying the Task, and starting the Task.
-
The process of constructing execution diagrams is mainly affected by job size and topology complexity. Currently, the time complexity of build execution diagrams is relatively high, and the time of building large job execution diagrams will increase significantly. However, we have not encountered this problem at the current scale, and the community has made a series of optimizations for this problem in version 1.13, which can be used for reference.
-
The resource application process is mainly affected by resource demand, resource health, scheduling performance, and scheduling policies. Problems such as insufficient resources, resource fragmentation, and scheduling to bad nodes may occur. As a result, jobs cannot start normally.
-
Deploying and starting tasks are affected by the number of tasks, number of TaskManagers, topology complexity, and user JAR size. When a large number of jobs are created, JobManager, as a master node, may encounter some processing bottlenecks. As a result, Task deployment may be slow or fail.
When we focus on the deployment and start Task steps, we notice the following: First, the network card of the machine where JobManager is located is full during deployment; As you can see from the TaskManager log, the userJAR operation took a long time to download; After comparative testing, we found that the reduced size of the UserJAR resulted in no RPC request timeouts and reduced deployment time. So we looked at the userJAR download process. Each Task thread needs to load the user’s code class loader from the user’s JAR package at startup, and this step needs to download the UserJAR from the JobManager BlobServer.
In the current implementation of Flink, each TaskManager needs to download the UserJar once, and multiple TaskManager tasks parse the same JAR package. When ha is enabled for a job, TaskManager reads the userJAR from HDFS to relieve JobManager of distribution. If the number of UserJars is large and the HA function is not enabled, JobManager is under great pressure to distribute userJars, which may block the network or even fill up the network adapter.
Knowing why, we can optimize the distribution of userJars. Our current deployment is yarn Session mode. In order to reuse session cluster to achieve rapid deployment when submitting jobs, and to achieve optimization effect when submitting multiple jobs in the same session cluster, Instead of using yarn to distribute userJars, we optimized it at the Flink level.
We let taskManagers on the same node download the UserJar only once, and all taskManagers on that node share the download result, because the number of userJar downloads dropped from the TaskManager granularity to the machine granularity by an order of magnitude. Greatly reduces the distribution pressure of JobManager.
Optimized the distribution of userJAR, we also found that on shuffle and high concurrency jobs, there were still some RPC timeouts during deployment, and there were a large number of requestPartitionState requests on JobManager. When the downstream Task is started, it checks whether the partition of the upstream Task is ready. If the partition is not ready, the downstream Task requests JobMaster to query the status of the upstream Task and determine whether to continue to request the partition of the upstream Task. When the size of the job is large, it is easy to start different tasks at inconsistent speeds, resulting in a large number of requestPartitionState requests on JobMaster.
We made a simple optimization for this problem. If a downstream Task fails to request a partition, it tries to retry the request for several times instead of immediately requesting JobMaster. This adjustment significantly reduces the number of RPC requests for requestPartitionState on JobMaster, allowing JobMaster to have more time to process other RPC requests.
After the above two steps of optimization, the distribution pressure of JobManager is greatly reduced. The above is an optimized rendering of the distribution of userJars, and you can see that the larger the operation, the more significant the optimization. In addition, at the current scale, we have eliminated RPC timeout exceptions so that large jobs can be successfully deployed.
Let’s look at the problem of uneven distribution of tasks.
We found that during the deployment of large jobs, tasks were not evenly distributed among all TaskManagers, which resulted in insufficient Network Buffer in some TaskManagers, resulting in job startup failure. We can solve this problem temporarily by increasing the total memory and adjusting the ratio of different memory, but this method does not solve the underlying problem and will increase the waste of resources. The amount of resources required varies according to the number and type of tasks in different TaskManagers. However, taskManagers apply for the maximum amount of resources. As a result, many TaskManagers apply for more resources than they actually need. On the other hand, taskManagers with more concentrated tasks have more computation pressure and are more likely to become the bottleneck of the entire Task.
We summarize the problem of uneven distribution of two types of tasks:
- One is that the number of tasks is unevenly distributed, that is, tasks with different operators are concentrated in the same TaskManager. For example, in the figure on the left, multiple source operators are concentrated in the first Task.
- In other words, different tasks of the same operator are concentrated in one TaskManager. For example, in the figure on the right, two tasks of Source and two tasks of sink are concentrated in the first TaskManager.
The number of tasks is not evenly distributed, mainly because SlotSharing allows different tasks of different operators to use the same Slot. However, the distribution of the number of tasks in the Slot is not considered when selecting a Slot. This results in multiple tasks being placed in a Slot, which results in tasks being placed in the same TaskManager. In this case, services usually have multi-source and multi-sink jobs for traffic distribution and aggregation.
To solve the problem of uneven quantity distribution, we optimized the Task Slot selection strategy. The new selection strategy is as follows: For tasks with no upstream, try to allocate them to new slots until the number of slots reaches the upper limit. For upstream tasks, the system preferentially places them in the same Slot as the upstream tasks to reduce unnecessary data distribution. If multiple slots are available, the slots with a small number of tasks are selected. After this optimization, tasks are evenly distributed across slots. In the figure above, you can see that the four source operators previously concentrated in one Slot are now evenly distributed in different slots.
Let’s look at the problem of uneven distribution of Task types. As we know, a Slot cannot contain multiple tasks of the same type. The Task type is concentrated only when multiple slots containing the same type of tasks are concentrated in a TaskManager. The selection of the TaskManager is affected by the order in which the Task is applied. Currently, the Slot application order is random and does not take into account the distribution of Task types, which results in tasks of the same type clustered in the same TaskManager. This problem is also more common, as long as the concurrency of different operators is not consistent, this problem may occur.
To address the uneven distribution of the types, we optimized the application order of the current Slot. Adjust the Slot application sequence based on the combination of tasks of the same Task type. Try to allocate the slots that contain the same combination of tasks to different TaskManagers. For example, slots containing the Source, Process, and Sink operators are evenly distributed among different TaskManagers.
The above two optimizations are independent and can be combined to achieve better optimization results, as shown in the figure above.
As verified by our online large Task, it can be seen that in terms of Task quantity distribution, after optimization, each TaskManager has 6 tasks, which are evenly distributed. In terms of resources and loads, the number of Network buffers, number of tasks, and CPU usage in TaskManager are more consistent than before due to the balancing of the number and type of tasks.
Finally, let’s look at the pressure problem of HDFS. There are two reasons for HDFS pressure:
-
One is that the load of HDFS increases with the normal service growth.
-
On the other hand, the online deployment of large jobs also puts greater instantaneous pressure on the HDFS. During Checkpoint production of large jobs, the HDFS receives a large number of RPC requests, causing the CallQuque of THE RPC to fill up and affecting the HDFS reading and writing of other jobs.
It can be seen that the pressure of HDFS mainly comes from NameNode.
Our approach to NameNode stress is also straightforward. First, multiple groups of HDFS Namenodes are deployed at the bottom layer, so that the level scaling can be achieved on the bottom resources. At the engine level, we provide a balancing strategy for multiple sets of tasks to determine which set of Namenodes a job uses. After that, the job actually uses different Namenodes by dynamically specifying the associated paths. Finally, the HDFS service capability can be horizontally expanded, and the deployment and running of large jobs will not affect other jobs.
In addition to the above optimizations, we also made several other optimizations to help large jobs better deploy and run. We open Flink’s operating parameters to users so that users can tune their own jobs individually. We limit the minimum Checkpoint production interval to avoid unreasonable high Checkpoint production affecting other jobs on the cluster.
Checkpoint Cross-machine room copy
In Meituan, our Flink computing resources will interact with each other in multiple computer rooms. The same project team may have computing resources in different computer rooms, so the operation may have the scenario of changing computer rooms to start. Across our experience, we prefer to use Retained Checkpoint rather than Savepoint. Because we primarily use RocksDBStateBackend’s incremental Checkpoint, it is more efficient to make and restore than Savepoint, with shorter interval configurations and less data to backtrack from state recovery. In addition, some important services require higher status Dr Capability. If the equipment room is faulty, the equipment room must be switched to another equipment room. Finally, although Savepoint across machine room replicas will solve some of these problems in 2020, businesses increasingly prefer to employ Retained Checkpoint for Retained Checkpoint recovery.
Two objectives can be drawn from the above background:
- First, all jobs need to be started from Checkpoint. This requires that the Checkpoint of the original machine room be copied to the target machine room before the Checkpoint of the job is started.
- Second, the Checkpoint of critical jobs should support cross-room Dr. This means that new Checkpoint needs to be copied to the backup equipment room in real time as Checkpoint completion continues to prevent sudden failure of the original equipment room.
Through analysis, we need to accomplish the above goals in two steps.
- First of all, we need to transform the Flink engine to make Checkpoint meet the conditions of self-contained & Relocatable. Specific concepts and principles will be introduced in detail later. Now we can simply understand that only if this condition is met, A Checkpoint copy is available. Otherwise, Checkpoint copies cannot be replicated elsewhere.
- Second, we need to implement Checkpoint replicator capabilities, known as Checkpoint Replicator Service.
The Checkpoint self contained & Relocatable feature is primarily intended to allow checkpoints to be moved and copied. Flink already supports this feature on Savepoint in version 1.11, but Checkpoint is a bit more complex and has not yet supported it. To understand this, we need to understand the Checkpoint directory structure.
Changing the checkpoints/{job-id} parameter Parameter Changing the checkpoints/{job-id} directory, also known as an exclusive directory, that stores the unique files of each Checkpoint. The second directory is shared, which stores files that are shared between Checkpoint locations. Finally, there is the taskowned directory, which holds files that can never be deleted by JM.
Each Checkpoint has a metadata file that stores Checkpoint metadata.
A Checkpoint file contains references to exclusive and shared files, as shown in the orange line above. The metadata file contains references to exclusive and shared files. You can find all the files you need for a Checkpoint.
So Checkpoint is not self-contained. In some cases, a Checkpoint metadata file refers to the shared Checkpoint file of another Checkpoint instance. In this case, different Checkpoint instances may have the same Checkpoint code deployed multiple times. Each Flink job ID corresponds to a job instance.
Job1, joB2, and job3 start the same job multiple times. Job3 starts from the Checkpoint left by Job2. Job2 starts from a Checkpoint left by Job1, resulting in a long chain of references. Across production applications, this is very common. We often repeat Checkpoint and restart the job after modifying parameters and code.
This raises two problems:
- First, it causes Retained Checkpoint to be difficult to clean. Across Checkpoint clearance, the file cannot be referenced by another Checkpoint. Therefore, the task management platform needs to maintain the file reference count of the Checkpoint, which increases the complexity of platform management.
- Second, Checkpoint copies are unavailable across storage systems. For example, after we copy Checkpoint from HDFS1 to HDFS2, the files referenced by the cross-job instance do not exist on HDFS2, so the Checkpoint is unavailable. Of course we could have avoided this problem by copying all directly and indirectly referenced files to HDFS2, but this would have greatly increased the complexity of making copies.
So how do we make Checkpoint self-contained?
It is important to note that while the above discussion is not limited to a specific StateBackend, this problem is primarily caused by incremental Checkpoint of RocksDBStateBackend. This is the Backend and Checkpoint methods we use by default in the production environment. So let’s first look at RocksDBStateBackend’s incremental Checkpoint.
RocksDB is an LSM tree-based KV storage engine that writes persistent data to disk files. The file directory structure for an instance of RocksDB is shown above, which can be divided into two categories:
- The first type is all kinds of metadata, such as DB configuration, version Changelog, etc., which may be constantly updated during operation.
- The other type is the SST file, which is the RocksDB data file that contains the data contents, indexes, and so on. Such files, once generated, will not be modified, but will only be continuously generated and deleted as data is continuously written and compassion.
When Checkpoint production starts, RocksDBStateBackend flushs data and uploads all files in the DB instance to a specified Checkpoint storage (in our case, HDFS). If there are 123 SST files in the DB when making Checkpoint3, these SST files will be used directly by the subsequent increment Checkpoint, and will be placed in the shared directory. All meta files are stored in the exclusive directory.
After a while, we’ll start making Checkpoint5 based on Checkpoint3. Checkpoint3: Checkpoint5: Checkpoint4 This is because a Savepoint may be inserted, and the Savepoint takes up the Checkpoint sequence.
When making Checkpoint5, the file under the DB instance is as shown on the right in the figure above, adding 4.sst and reducing 01.sst. In this case, you only need to upload the metadata file and 04. SST, but record file references in the metadata file and 03. SST.
How do YOU know that 02.sST and 03.sST have been uploaded? In fact, a previous-sST-list is used to record all SST files in the previous Checkpoint, so that incremental Checkpoint production can be performed based on the previous-sST-list.
When an incremental Checkpoint recovery is performed, previous- sST-list is first recovered based on Checkpoint information, and RocksDB instance is constructed, that is, the meta file and SST file are downloaded to the corresponding location. In this way, even the first Checkpoint after startup can be built incrementally based on its restored Checkpoint.
However, this means that the Checkpoint that started the job may refer to the files in its restored Checkpoint, which is the root cause of Checkpoint references across job instance files mentioned earlier.
Before restoring previous-sST-list, check whether the jobs to which restore Checkpoint belongs are current jobs. If so, restore previous-sst-list. If no, it indicates that a new task is enabled from a retained Checkpoint and previous-sST-list is not restored. Otherwise, the first Checkpoint after the job is started will upload all the files, and the later Checkpoint will be incremented based on the previous Checkpoint, so there will be no cross-job file reference problem.
The new problem is how to know the job ID to which the restore Checkpoint belongs. The Checkpoint metadata structure above shows the Checkpoint metadata structure, from which we cannot get the job ID, so we need to put the job ID into it. The key is the Keyed State Handle in the red box, which has many different implementations, each of which represents a Checkpoint or Savepoint method.
If is IncrementalRemoteKeyedStateHandle that this is a RocksDBStatebackend incremental Checkpoint, So as long as give IncrementalRemoteKeyedStatehandle increase a jobID field, making a Checkpoint in the ID field is serialized to the meta file, In this way, the JOB ID of Checkpoint is known when the restore is performed.
As with relocatable, references to exclusive and shared files are recorded in MetData, which records the absolute path of a file. When Checkpoint is copied in its entirety to another directory, these references become invalid. The solution is simply to change the absolute path to a relative path, so that Checkpoint can calculate the file location based on its exclusive directory and the relative path of the file, so that Checkpoint cannot be moved around. At this point, the Checkpoint relocatable problem is solved.
With self-Contaiend & Relocatable, our Checkpoint now supports making replicas anywhere, so we can begin to support Checkpoint making replicas across machine rooms.
When initially evaluating how to achieve replicas across machine room capabilities, there were several alternatives:
-
The first is to replicate the entire Checkpoint directory across the machine room using distcp as well as Savepoint replicas, which works well for Savepoint replication. However, because distcp starts a heavy MapReduce job for each copy, Checkpoint is much more frequent than Savepoint, and the job is still running during the distcp process, files may be constantly deleted during the copy, although it can be configured to ignore. But it can also cause some other problems, so it’s not appropriate;
-
The second approach is to write a Checkpoint replicator Service, which connects multiple HDFS clusters and is used exclusively for Checkpoint replicas.
-
Third, the Flink engine can be modified to write data to two HDFS clusters during Checkpoint production. However, this method will undoubtedly increase the instability factor of the engine, and the stability and efficiency of operation cannot be abandoned in order to deal with the machine room failure with a small probability.
-
Finally, the Checkpoint coordinator of Flink can be modified to trigger distcp once the Checkpoint production is complete, and no subsequent Checkpoint production is triggered until the distcp production is complete. This approach avoids the problem of file changes during distcp replication mentioned in Scenario 1, but was also abandoned for distcp efficiency reasons.
Checkpoint ReplicateService is implemented as follows: Each node holds multiple HDFS clients. In the figure above, two HDFS clusters and HDFS clients are distinguished by orange and purple colors. The running cluster and the replica cluster are respectively used. During copy creation, the HDFS client of the original cluster reads the file, sends the file to the HDFS client of the target cluster, and writes the file to the target cluster.
When we want to make a copy of Checkpoint5, first read the metadata file of Checkpoint5, parse all the referencedFiles to obtain the referencedFiles, and then add the metadata file. That’s all the files we want to copy to the target cluster. Replicate these files to a location in the target cluster using a replicate Service. Combined with the Checkpoint self-contained&Relocatable feature described earlier, We have a working copy of Checkpoint on the target cluster.
The question is whether subsequent replicas can be made incrementally as Checkpoint has done during the run, and the answer is yes.
As shown in the figure, we assume that when making a copy of Checkpoint5, Checkpoint3 already exists in the corresponding position of the target cluster. Then we can make incremental copies according to the copy of Checkpoint3. First read metadata file, parse out the reference file list referenceFiles3, and then perform set operations on these two file lists, you know how to make incremental copies.
- First, the files that exist only in Checkpoint5 are new files that need to be copied to the target cluster.
- In the second part, files existing only in Checkpoint3 are deleted during the new Checkpoint process. Because the replica cluster only needs to keep the latest Checkpoint3, these files will be deleted directly.
- Finally, there are the intersecting sections, which are required by Checkpoint5 but have already been uploaded and can therefore be ignored. In this way, we can make incremental replicas just like Checkpoint incremental replicas.
We have also gained a lot of experience in practical engineering practice:
-
The first point is that the metadata parsing process of the Flink engine needs to be modified. The current implementation accesses the HDFS where the metadata resides during parsing. If the HDFS client is not used, parsing may fail because the metadata resides in a cluster that is not connected to the replica service by default. However, this access is not required, so we remove it directly in the parse service.
-
The second point is to consider the parsing results of cached metadata. In large-state production jobs, metadata can be tens of megabytes (or even gigabytes) in size, reference files can reach hundreds of thousands, parsing time can take minutes, and incremental duplicates will parse the same metadata multiple times, so consider caching the parsing results.
-
The third point is that copy and deletion of reference files can be broken up into batches and sent to multiple nodes for parallel execution. This is because in high-status jobs, a Checkpoint may copy up to 10TB+ files, easily reaching the network bottleneck of a machine.
Two final tips:
- The first reason is that there is no need to retry when a running job copy fails to be made. This is mainly because new Checkpoint copies are generated continuously during running jobs, and the replication success of new Checkpoint copies is more significant than that of old Checkpoint copies.
- In addition, the execution of files-to-delete can be carried out asynchronously, and even if it fails, there will only be more useless file residues without affecting the availability of copies. Just make sure there’s a bottom-feeding strategy to clean it up eventually.
Other optimization related to state stability
In terms of state stability, we also optimized three other aspects:
First, we fixed a memory leak in RocksDBStateBackend that was triggered when a job was restarted and then TM that did not exit was reused. At the same time, TM heap memory is abundant and full GC is infrequent. The figure above shows the memory change of a TM we located. After two restarts, the MEMORY of the TM increases by about 4G. If the restart occurs again, the TM memory will be overused.
This problem is caused by a bug in the clearing process of RocksDBStateBackend. A RocksObject is not cleared. As a result, the native memory of the RocksDB instance cannot be released before the restart.
Second, the first Checkpoint increment after Savepoint degenerates to a full Checkpoint, and all RocksDB files are uploaded. The red box is a Savepoint, and the yellow box is a Checkpoint following it. The Checkpoint uploaded nearly 800 GIGABytes of files, which is significantly larger than the normal Checkpoint.
The cause of this problem is that the previous-SST-list was cleaned incorrectly after the production of Savepoint. We have submitted the repair to the community, and students who need it can upgrade to the corresponding version.
Finally, we support specifying a separate timeout when Checkpoint is triggered. This optimization is made because the production time of savePoints for large state jobs is usually much longer than that of incremental Checkpoint. As you can see, Savepoint takes almost 7 minutes to create, and incremental Checkpoint takes only a second or two, but Savepoint uses the Checkpoint timeout configuration directly. As a result, Checkpoint needs to be configured with a timeout that overwrites Savepoint, which makes it difficult to expose problems early.
5. Future planning
In the future, we will continue to improve and build stability in the following three aspects:
-
In terms of stability, we will continue to optimize the downtime of operations, improve the stability of operations, and explore K8S to achieve better resource isolation and capacity expansion.
-
In terms of running performance, we will optimize the state back end to support better running of large state jobs, and we will optimize the backpressure to better run jobs during peak and recovery periods.
-
Finally, in terms of resource efficiency, we will evaluate and optimize the resource utilization rate of operations to save resources and labor costs.
FFA 2021 Live Playback & Presentation PDF download