Author: Dong Tingting

Organized by: Jiang Xiaofeng

Introduction of the author: Tingting Dong, the team leader of Real-time computing engine of Kuaishou Big Data architecture. At present, I am responsible for the development and application of Flink engine in Kuaishou and the construction of surrounding subsystems. Graduated from Dalian University of Technology in 2013, he used to work for Qihoo 360 and 58 Group. His research interests include distributed computing, scheduling systems, and distributed storage systems.

This sharing includes the following three parts:

  1. The application scenario and current scale of Flink in Kuaishou are introduced.
  2. The technical evolution of Flink in the landing process is introduced.
  3. Discuss Flink’s future plans in Kuaishou.

one Flink in Kuaishou application scenarios with scale

1. Application scenario of Flink in Kuaishou

The fast hand computing link is connected to Kafka in real time from DB/Binlog and WebService Log, and then connected to Flink for real time computation, including real time ETL, real time analysis, Interval Join and real time training. The final results are stored in Druid, ES, or HBase, followed by data application products. Meanwhile, this Kafka data is dumped to the Hadoop cluster in real time, and then connected to offline computing.

The categories of Flink application in Kuaishou are mainly divided into three categories:

  • 80% statistical monitoring: real-time statistics, including various data indicators, monitoring items alarm, used to assist business real-time analysis and monitoring;
  • 15% data processing: data cleaning, splitting, Join and other logical processing, such as big Topic data splitting and cleaning;
  • 5% data processing: Real-time business processing, real-time processing for specific business logic, such as real-time scheduling.

Typical scenarios of Flink application in Kuaishou include:

  • Kuaishou is a platform to share short videos and live broadcast. The quality of kuaishou short videos and live broadcast is monitored by real-time statistics through Flink, such as the broadcast quantity of live viewers and anchors, lag rate, failure rate of broadcast and other monitoring indicators related to live broadcast quality.
  • User growth analysis, real-time statistics of the new situation of each channel, real-time adjustment of the amount of each channel according to the effect;
  • Real-time data processing, advertising display stream, real-time Join of click stream, splitting of client logs, etc.
  • Real-time CDN scheduling, real-time monitoring of the quality of each CDN manufacturer, real-time Flink training to adjust the flow ratio of each CDN manufacturer.

2.Flink Cluster scale

The current kuaishou cluster has about 1500 units, and the number of operations is about 500. The total number of items processed per day is 1.7 trillion, and the peak number of items processed is about 37 million. All clusters are deployed in On Yarn mode, including offline clusters and real-time clusters. Offline clusters are deployed in mixed mode, and machines are physically isolated by labels. Real-time clusters are Flink dedicated clusters, which require high isolation and stability.

two Kuaishou Flink technology evolution

The evolution of Kuaishou Flink technology is mainly divided into three parts:

  1. Optimization based on specific scenarios, including Interval Join scenario optimization;
  2. Stability improvement, including data source speed control, JobManager stability, frequent job failure;
  3. Platform construction.

1. Scenario optimization

1.1 Interval Join Application Scenarios

One application scenario of Interval Join in Kuaishou is the real-time Join scenario of advertising display click stream: opening Kuaishou App may receive advertising videos recommended by advertising services, and users sometimes click on the displayed advertising videos. In this way, two data streams are formed at the back end, one is the advertisement display log and the other is the client click log. Real-time Join is required for these two pieces of data. Join results are used as sample data for model training, and the trained model will be pushed to online advertising services. In this scenario, the click in the next 20 minutes is considered to be a valid click, while the real-time Join logic is the click in the last 20 minutes of data Join. Among them, the data volume of the presentation stream is relatively large, with more than 1 TB of data in 20 minutes. At first, the real-time Join process is realized by the business itself. Redis cache ads to display logs, and Kafka delayed consumption client clicks logs to realize Join logic. The shortcoming of this method is that the real-time performance is not high, and more machines need to be piled up with the growth of business, and the operation and maintenance cost is very high. Interval Join based on Flink perfectly fits this scenario, and with high real-time performance, it can output the result data after Join in real time. For services, the maintenance cost is very low, and only one Flink job needs to be maintained.

1.2 Interval Join Scenario Optimization

1.2.1 Interval Join Principle:

Flink implements Interval Join as follows: Data of two streams is cached in internal State. When any data arrives, the corresponding time range data of the opposite stream is obtained and joinFunction is executed to join. As time goes on, the data of the corresponding time range of the two streams in State will be cleared.

In the advertising application scenario mentioned above, Join the data of the past 20 minutes. Assume that the data of the two streams arrive in A completely orderly manner. Stream A is used as the display Stream to cache the data of the past 20 minutes, and Stream B is used as the click Stream to Join the data of the past 20 minutes every time.

Flink implements Interval Join:

KeyedStreamA.intervalJoin(KeyedStreamB)
         .between(Time.minutes(0),Time.minutes(20))
         .process(joinFunction)
Copy the code
1.2.2 State Storage Policy Selection

State storage in the production environment can be Backend in either of the following ways:

  1. FsStateBackend: State is stored in memory and persisted in HDFS at Checkpoint.
  2. RocksDBStateBackend: State is stored in the RocksDB instance and can be changed to Checkpoint. More than 1 TB of data can be displayed within 20 minutes in advertising scenarios. In consideration of memory saving and other aspects, RocksDBStateBackend is finally selected by Kuailiou.

In Interval Join scenarios, RocksDB stores the data of two flows in two Column families. Rowkeys are organized in keyGroupId+joinKey+ TS mode.

1.2.3 RocksDB Access Performance Problems

The first problem encountered with the Flink job coming online was the RocksDB access performance problem as follows:

  • After running for a period of time, the job experienced back pressure and throughput decreased.

  • Jstack shows that the program logic is frequently at RocksDB GET requests.

  • Top indicates that the single-thread CPU is continuously full.

Further analysis of the problem found that: In this scenario, when Flink stores data within a certain range of a Join key value based on RocksDB State, it obtains the entries set of a Join key prefix through prefix scanning, and then determines which data is in the corresponding time range. Prefix scanning causes a large amount of invalid data to be scanned. Most of the scanned data is cached in PageCache, consuming a large amount of CPU when Decode data to determine whether data is Delete.

For example, the blue part is the target data, and the red part is the data outside the upper and lower boundaries. During prefix scanning, too much useless data in the red part is scanned, and the single-thread CPU is consumed when processing a large amount of invalid data.

1.2.4 Optimization for RocksDB access performance

In the scenario of Interval Join, Kuaishou has optimized the access mode of RocksDB as follows:

  • In the Interval Join scenario, the boundary range of data to be accessed can be accurately determined. Therefore, use Full Key range scan instead of prefix scan to accurately spell the upper and lower boundaries of the query. Full Key is keyGroupId+joinKey+ts[lower,upper].

  • Range query RocksDB, can more accurately Seek to the upper and lower boundaries, to avoid invalid data scanning and verification.

Optimization result: P99 query delay performance is improved by 10 times, that is, nextKey obtains a data in RocksDB, and P99 delay is reduced from 1000 ms to less than 100 ms. The problem of operation throughput and backpressure was solved.

1.2.5 RocksDB Disk Pressure Problem

The second problem that Flink encountered online was that with the growth of services, the disk pressure on RocksDB was about to reach its upper limit. At peak times, disk util reached 90% and write throughput was 150 MB/s. Detailed analysis shows that the problem is caused by the following reasons:

  • Flink machines are computational machines with large memory and a single HDD. If the cluster size is not very large, a single machine will have 4-5 containers for the job and use one HDD at the same time.

  • The RocksDB background compacts frequently. Write Compaction occurs while Checkpoint writes to the disk.

For RocksDB disk pressure, kuaishou has made the following internal optimization:

  • Tune the RocksDB parameter to reduce Compaction IO. After optimization, the total IO volume decreases by about half.
  • In order to adjust RocksDB parameters more easily, the Large State RocksDB configuration package is added in the Flink framework layer. RocksDBStateBackend can also customize RocksDB parameters.
  • In the future, the State will be stored in shared storage, so as to further reduce the total AMOUNT of I/OS and fast Checkpoint and recovery.

2. Stability improvement

Firstly, the background of video quality monitoring and scheduling application is introduced. There are several Kafka topics that store quality logs related to short videos and live broadcasts, including short video upload/download, logs of live viewers and logs reported by anchors, etc. Flink Job reads the data of corresponding topics and makes real-time statistics of various indicators, including broadcast volume, lag rate, black screen rate and broadcast failure rate, etc. The metrics are stored in Druid for subsequent alarm monitoring and multi-dimensional metrics analysis. At the same time, there is another stream for live CDN scheduling, which also uses Flink Job to conduct real-time training and adjust the traffic ratio of various CDN manufacturers. The above Kafka Topic data will be delivered simultaneously to the Hadoop cluster for offline compensation. The real-time calculation and the offline data supplement process share the same Flink code, and read Kafka data or HDFS data respectively for different data sources.

2.1 Data source speed control

The problems encountered in the video application scenario are as follows: The job DAG is complicated, and data is read from multiple topics at the same time. Once a job is abnormal and the job fails to recover from an earlier state, some historical data needs to be read. At this time, the concurrent data reading speed of different sources is uncontrollable, which will lead to the accumulation of the operator State of Window class and the deterioration of job performance, and ultimately lead to the failure of job recovery. In addition, when data is repaired offline, data reading from different HDFS files may also be uncontrollable. Previously, the temporary solution in real-time scenarios was to reset GroupID to discard historical data so that consumption started from the latest location.

To solve this problem, we hope to control the concurrent reading speed of multiple sources from the Source, so we design the strategy of controlling the speed from the Source.

Source Speed control policy

Source speed control strategy is:

  • SourceTask Share speed status < ID, Ctime,watermark,speed> provided to JobManager.
  • The JobManager imports the SourceCoordinator. The Coordinator has a global speed perspective, works out a policy, and sends the rate limiting policy to the SourceTask.
  • SourceTask Executes the speed control logic based on the speed adjustment information sent by JobManager.
  • One small detail is that if DAG graph has subgraph, different subgraph Source does not affect each other.

Source Details about the speed control policy

SourceTask Share status

  • SourceTask Periodically reports status to JobManager at an interval of 10 seconds by default.
  • Report content for < id, clocktime, watermark, speed >.

Coordination Center SourceCoordinator

  • Speed limit threshold: fastest to Watermark – slowest to Watermark > ∆t(default 5 minutes). You can set a rate limiting policy only when the rate limiting threshold is reached.
  • Global prediction: each concurrent targetWatermark=base+speed*time; Coordinator makes global prediction to predict the Watermark location that each concurrency can reach in the next interval.
  • Global decision: targetWatermark = predict the slowest Watermark+∆t/2; According to the global prediction result, the Coordinator takes the slowest predicted Watermark value and floats another range as the target value for the global rate limiting decision in the next period.
  • Speed limit information issued by: < targetTime targetWatermark >. Send the global decision information to all Source tasks, including the time of the next target and the target Watermark location.

For example, at time A, 4 concurrent connections reach the location as shown in the figure respectively, which is the time of A+interval for prediction. The blue dotted line in the figure is the location that can be predicted for each parallel connection. The slowest concurrent Watermark location is selected, and the floating range is the time of Watermark + ∆t/2. Watermark is the target speed limit, which is sent to the downstream Task as a global decision.

SourceTask Rate limiting

  • SourceTask to limit information < targetTime targetWatermark >, control the speed limit.
  • Take KafkaSource as an example. When KafkaFetcher obtains data, it checks the current progress based on the rate limiting information to determine whether the rate limiting wait is required.

There are other considerations in the scheme, such as:

  • Time attribute: Speed limiting is implemented only in EventTime.
  • Switch control: The job switch controls whether to enable the Source rate limiting policy.
  • DAG subgraph Source The sources do not affect each other.
  • Whether the CheckPoint Barrier is affected.
  • The data source sending speed was not constant, and the Watermark was mutated.

Source Speed control result

Take online jobs and use Kafka to start consuming from the earliest location (2 days ago). As shown in the figure above, in the case of no speed limit, State keeps increasing and eventually the job dies. After using the speed limiting strategy, the State increases slowly at the beginning, but the size of the State is controllable, and finally it can catch up with the latest data steadily, and the State continues at about 40 G.

2.2 JobManager Stability

In the Case of JobManager stability, two types of cases are encountered, both of which are as follows: In a large number of concurrent jobs, the JobManager WebUI becomes congested and the job scheduling times out. The causes of the problems in the two scenarios are further analyzed.

Scenario 1: JobManager suffers from high memory pressure. The JobManager needs to delete the HDFS path of Checkpoint. When the NameNode pressure is high, the Completed CheckPoint Path is slowly deleted, causing CheckPoint Path accumulation in memory. The Checkpoint path policy for deleting a file is as follows: When a file is deleted, check whether the directory is empty. If the directory is empty, delete it. In a large Checkpoint path, the List directory operation is an expensive operation. Optimized for this logic, HDFS Delete (Path,false) is invoked when a file is deleted, which has consistent semantics and low overhead.

Scenario 2: This Case occurs after the Yarn Cgroup function comes online. The GC process of JobManager G1 is slow, which blocks application threads. The number of cpus applied for by AppMaster is hardcoded as 1, and the available CPU resources are limited after the Cgroup goes online. To solve this problem, configure the number of cpus applied for by AppMaster.

2.3 Frequent Job Failures

There are two scenarios for frequent failure of operations caused by machine faults:

Scenario 1: Continuous job scheduling fails due to a disk fault. Some Buffer files cannot be found because of a disk problem. Because the TaskManager does not know the disk health status, it frequently dispatches jobs to the TaskManager, causing frequent job failures.

Scenario 2: The TaskManager frequently generates Core on a certain machine due to a problem on a certain machine. New TaskManagers are assigned to this machine, causing frequent job failures.

Solutions to machine failures:

  • To address disk problems, the TaskManager adds the DiskChecker disk health check. If the disk is faulty, the TaskManager automatically exits.
  • If TaskManager frequently fails on some machines, add the faulty machines to the blacklist based on certain policies. Then, use the soft blacklist mechanism to tell Yarn not to schedule Containers to the machines.

3. Platform construction

3.1 Platform Construction:

The platform construction of Kuaishou is mainly reflected in the hosting platform of Ivy operation. Through this platform, job operation, job management and job details can be viewed. Job operations include submitting and stopping jobs. Job management includes managing job survival, performance alarm, automatic pull-up configuration, etc. View the details, including the Metric of the job.

The above picture shows some operation interfaces of ivy job hosting platform.

3.2 Optimization of problem Location process:

We also often need to analyze performance problems for business and help business debug some problems. The process is relatively tedious. Therefore, we have also done a lot of work in this part, trying to provide more information to the business, so that the business can independently analyze and locate problems. First, we put all the metrics into Druid, and we use Superset to analyze the metrics from all dimensions of the job. Secondly, some improvements have been made to the Web Web user interface (WebUI) of Flink. Jstack can be printed on the Web in real time. Web DAG can add serial numbers to each Vertex and add concurrent SubtaskId to Subtask information. Third, rich exception information prompt, for machine downtime and other specific scenarios for clear prompt. Fourth, add metrics.

3. Future plans

Kuaishou’s future planning is mainly divided into two parts:

First, Flink SQL related work is currently under construction. Flink SQL is an important part of our future plans because SQL can reduce the cost of user development, including the need to connect to real-time data warehouses as well. Second, we want to do some resource optimization. At present, the service may require too many resources and the concurrent estimate is not accurate. As a result, resources are wasted. In addition, how to improve the overall utilization of cluster resources is also a problem to be explored.