This article is compiled from the topic “Practice and Optimization of Apache Flink in JD” shared by Fu Haitao, senior technical expert of JD in Flink Forward Asia 2020. The content includes:
- Service evolution and scale
- Containerization practice
- Flink optimization improvement
- The future planning
I. Business evolution and scale
1. Service evolution
Jingdong built the first-generation streaming processing platform based on Storm in 2014, which can better meet business requirements for real-time data processing. It has some limitations, however, and can be inadequate for business scenarios where data volumes are particularly high but latency sensitive. So we introduced Spark Streaming in 2017, leveraging its microbatch to address this business scenario.
As businesses grow and scale up, there is an urgent need for a computing engine that has both low latency and high throughput capabilities, while supporting window computing, state and just-once semantics.
- So in 2018, we introduced Flink and started to upgrade real-time computing containers based on K8s.
- By 2019, all of our real-time computing tasks are running on K8s. In the same year, we created a new SQL platform based on Flink 1.8 to facilitate business development of real-time computing applications;
- By 2020, the new real-time computing platform based on Flink and K8s has been relatively perfect. We have unified the computing engine and supported intelligent diagnosis to reduce the cost and difficulty of user development, operation and maintenance of applications. In the past, flow processing was a big focus for us. That same year, we started to support batch processing, and the whole real-time computing platform began to evolve in the direction of batch streaming.
2. Service scenarios
Jd Flink serves a large number of business lines within JD. Its main application scenarios include real-time counting warehouse, real-time large screen, real-time recommendation, real-time report, real-time risk control and real-time monitoring, as well as other application scenarios. In short, business requirements for real-time computing are generally developed with Flink.
3. Business scale
At present, our K8s cluster consists of more than 5,000 machines, serving more than 20 first-level departments within JD. There are currently more than 3000 stream computing tasks online, and the peak flow computing processing reaches 500 million streams per second.
2. Containerization practice
Let’s share some containerization practices.
In 2017, most of the missions inside JD are still Storm missions that run on physical machines, with a small percentage of Spark Streaming running on Yarn. Different operating environments lead to high cost of deployment, operation and maintenance, and waste of resources. Therefore, we urgently need a unified cluster resource management and scheduling system to solve this problem.
After a series of trials, comparisons and optimizations, we chose K8s. It can not only solve some problems of deployment operation and maintenance, resource utilization, but also has the advantages of cloud native elastic self-healing, complete isolation of natural containers, easy expansion and migration. So in early 2018, we started the containerization upgrade.
On June 18, 2018, only 20% of our missions ran on K8s; As of February 2019, all tasks that have realized real-time computing are running on the K8s. After containerization, the real-time computing platform experienced many times of promotion on June 18 and November 11, withstanding the flood peak pressure and running very stably.
However, our previous Flink containerization solution was static based on pre-allocation of resources, which could not meet many business scenarios. Therefore, we also carried out an upgrade of the containerization solution in 2020, which will be described in detail later.
** Containerization brings a lot of benefits. ** Here are three main points:
- First, it is convenient to realize mixed deployment of services, greatly improving resource sharing ability and saving machine resources.
- Second, natural elastic expansion, certain self-healing ability, and it can achieve a more complete resource isolation, better guarantee the stability of business.
- Third, containerization achieves a consistent environment for development, testing, and production, while improving the ability to deploy and automate operations and maintenance, and reducing the cost of management and operation in half.
Our past containerization solution was a Standalone Session cluster based on K8s Deployment deployment. It requires the user to predict the required resources of the cluster, such as the specifications and number of jobManager and TaskManager resources, when the platform creates the cluster. Then the platform sends a request to the K8s master through the K8s client. To create the Deployment for JobManager and the Deployment for TaskManager.
Among them, the whole cluster of high availability is based on ZK implementation; The state storage mainly exists in HDFS, and a small part exists in OSS. Monitoring indicators (container indicators, JVM indicators, task indicators) were reported to Prometheus, and the indicators were visually displayed in combination with Grafana; Logs are collected, stored and queried based on our internal Logbook system of JD.
In practice, this scheme has two shortcomings:
- First, resources need to be allocated in advance, which cannot meet the flexible business needs and cannot be allocated on demand.
- Second, Pod cannot be pulled up normally in extreme scenarios, affecting task recovery.
So we carried out an upgrade of container scheme to realize the dynamic resource allocation method based on K8s. When creating the cluster, we will first create the Deployment of JobManager according to the number of job managers specified by the user. When a user submits a task, we dynamically apply for resources to the platform and create a TaskManager according to the number of resources required by the task.
If a task needs to be expanded during operation, the Job Manager interacts with the platform to dynamically expand the capacity. And when resources are found to be wasted, the capacity will be reduced. In this way, the problem caused by static preallocation can be solved and the resource utilization can be improved.
Here, the ** platform interacts with K8s to create and destroy resources. ** is mainly based on four considerations:
- Ensure the computing platform to the supervision of resources.
- Avoiding the impact of platform cluster configuration & logic changes on mirroring.
- The differences between container platforms are masked.
- Platform original K8s interaction related code reuse.
In addition, to be compatible with the existing Slot allocation policy (distributed by Slot), the system estimates the resources required by the task and applies for the task at a time when submitting the task, and waits for the task according to a certain policy. Slot allocation is performed when sufficient resources are available to meet task running requirements. This is largely compatible with the original slot allocation strategy.
Third, Flink optimization and improvement
Here are the optimization improvements of Flink.
1. Preview the topology
In the process of business using the platform, we found several business pain points:
- First, task tuning is tedious. If you need to adjust the task parallelism, Slot grouping, and Chaining strategy after the platform submits and runs the task, you need to modify the program again or tune the task through command line parameter configuration, which is very tedious.
- Second, SQL tasks do not have the flexibility to specify operator configurations.
- Third, after the task is submitted to the cluster, it is not clear in advance how many resources are required and how many slots are required for the task.
- Fourth, the network buffer is insufficient after the adjustment of parallelism.
To solve these problems ** we developed the ability to preview topology:
- First, topology configuration. After users submit tasks to the platform, we preview the topology and allow it to flexibly configure the parallelism of these operators.
- Second, slot group preview. We will clearly show the task slot grouping and how many slots are required.
- Third, network Buffer estimation. In this way, users can adjust and tune services on the platform to the maximum extent.
The following describes the workflow of preview topology. Users submit SQL jobs or Jar jobs on the platform. After the job is submitted, the configuration information of an operator will be generated and fed back to our platform. Our platform will preview the entire topology, and then users can adjust the operator configuration information online. After adjustment, submit the adjusted configuration information to our platform again. Moreover, this process can be continuously adjusted, and users can submit tasks when they feel ok with the adjustment. After submitting the task, the entire online adjustment parameters take effect.
Here, the task can be submitted for multiple times. How to ensure the stable corresponding relation of the generation operator of the previous two submissions? We use a strategy that if you specify a uidHash or uid, we can use the uidHash and UID as the Key for such a relationship. If not, we iterate through the topology, generating a certain unique ID based on the location of the operator in the topology in breadth-first order. Once you get the unique ID, you can get a definitive relationship.
2. Quantization of back pressure
Now let’s introduce our second improvement, quantization of back pressure. There are two ways to observe back pressure:
-
The first way is ** through the Flink UI back pressure panel, ** can be very intuitive view of the current back pressure situation. But it has some problems:
- First, back pressure cannot be collected in some scenarios.
- Second, historical back pressure cannot be tracked.
- Third, the influence of back pressure is not intuitive.
- Fourth, the back pressure acquisition will have a certain pressure in high degree of parallelism.
-
Another way to look at back pressure is based on the Flink Task Metrics Metrics. ** For example, it reports indicators such as inPoolUsage and outPoolUsage to Prometheus for a query, which solves the problem of backpressure history tracking. But it has some other problems:
- First, different Flink versions of the back pressure indicator meaning has a certain difference.
- Second, the analysis of back pressure has a certain threshold, you need to have a deep understanding of the whole back pressure related indicators, joint analysis.
- Third, the impact of back pressure is less intuitive, making it difficult to measure its impact on the business.
Our solution to this problem is to collect and report the location, timing and frequency of back pressure. By combining the quantified back pressure monitoring index with the runtime topology, the impact of back pressure (affecting the position, duration and frequency of tasks) can be seen intuitively.
3. The file system supports multiple configurations
The following describes the multi-configuration function of the file system.
FileSystem. Get is used to pass in the URI. FileSystem uses shceme+authority as the key to find the cached FileSystem. You can use create to create a file system using FileSystemFactory. After you return, you can operate on the file. However, in platform practice, problems like this are often encountered:
- First, how do I write checkpoint data to a public HDFS and service data to another HDFS? For example, when the platform manages status in a unified manner and users only read and write HDFS data, they do not care about status storage. How to satisfy such a business scenario?
- One solution is to merge the configurations of multiple HDFS clusters, but there is a problem with this. If multiple HDFS clusters have conflicting configurations, merging them may cause problems.
- In addition, some federated mechanisms, such as ViewFs, can be considered, but this mechanism can be a bit heavy. Is there a better solution?
- Second, how can data be read from one OSS storage, processed and written to another OSS storage?
Both of these issues involve ** how to make the same file system of Flink support multiple configurations. ** Our solution is to specify and isolate different configurations by using different schemes. The following figure shows how HDFS supports multiple configurations:
- The first step is to set the scheme (HDFS) bound to the user-defined Scheme (aaHDFS) and the HDFS configuration path.
- Second, when FileSystem. Get is called, load the Hadoop configuration from the path corresponding to aaHDFS.
- Step 3 When reading and writing HDFS, use HadoopFileSystemWrapper to convert the user – defined Scheme path (aaHDFS://) to a real Hadoop path (HDFS://).
We also made a number of other ** optimizations and extensions, ** in three major chunks.
- The first is performance optimization, including HDFS optimization (merging small files, reducing RPC calls), dynamic rebalance based on load, Slot allocation strategy expansion (sequential, random, slot-by-slot), and more.
- The second is stability optimization, including ZK anti-shake, JM Failover optimization, the last checkpoint as a savepoint, etc.
- The third block is ease of use optimization, including log enhancement (log separation, log level dynamic configuration), SQL extension (Windows support incremental calculation, offset support), intelligent diagnosis, and so on.
4. Future planning
Finally, future planning. Four points can be summarized as follows:
- First, continuously improve the SQL platform. Continue to enhance and improve the SQL platform to drive more use of SQL development jobs.
- Second, intelligent diagnosis and automatic adjustment. Automatic intelligent diagnosis, adaptive adjustment of operating parameters, operation autonomy.
- Third, one batch. SQL layer batch integration, both low latency stream processing and high stability of batch processing capability.
- Fourth, AI exploration practice. Batch stream unification and REAL-TIME AI, artificial intelligence scene exploration and practice.