Abstract: This article is written by MAO Yan Can, a PhD student in the Department of Computer Science, National University of Singapore, at Flink Forward Asia 2021. The main contents include:
- Background: Dynamic control of flow operations
- The challenge: Balancing universality, efficiency and ease of use
- Design: Task-centric system design
- Implementation: Barrier mechanism based on Flink
- Evaluation: Performance comparison of Trisk with existing systems
FFA 2021 Live Playback & Presentation PDF download
I. Background: Dynamic regulation of flow operations
Streaming data processing is a very important data processing method, which has a wide range of applications in various fields, such as machine learning, data analysis, real-time event processing and real-time transactions. Stream processing has the characteristics of low latency and high throughput. It is deployed in a large scale as a stream job consisting of stream task instances to process the input data stream in parallel. Stream jobs are deployed as parallel stream tasks, and these stream task instances are connected by intermediate flows and form a directed acyclic graph.
Streaming data parallel processing is through the input data partition between the parallel tasks, and then each task partition task of real-time implementation independent processing distribution, because the flow assignment is a long-term implementation and will over time jitter, and different flow operation have different performance requirements, such as real-time transaction is sensitive to delay, and some data analysis task for high throughput requirements. In order to meet the performance requirements of different flow processing jobs, the technique of dynamically reconfiguring flow tasks is critical.
Common data jitter is as follows:
- First, the change in input rate. The flow job is executed for a long time, and the input rate of the data stream will change dynamically unpredictably, so the statically allocated resources cannot process the data stream with low latency and high throughput.
- Second, data skew. The data distribution of stream data will change dynamically. For example, an increase in the frequency of a certain data will lead to a larger workload and delay of the corresponding Stream task.
- Third, the emergence of emerging events. There may be emerging events or data in the stream data that cannot be executed correctly by the current execution logic. New fraudulent transactions, for example, need to be detected by new rules.
According to different data jitter, there are different types of reconfiguration techniques to optimize the flow job, so as to ensure the resource utilization while processing the flow data with high throughput and low latency.
- In view of the changes in the input sequence, resources can be dynamically scaled up by scaling to improve throughput and reduce latency.
- Aiming at data skew, Load balancing can be used to redistribute the workloads between parallel executing stream tasks to achieve load balancing.
- For the processing of emerging events, you can update the execution logic of a flow task by changing of Logic so that emerging events and data can be properly processed.
With different types of data jitter and reconfiguration techniques in hand, the next question to consider is how to detect data jitter dynamically and choose an appropriate method to regulate flow tasks. To solve this problem, a controller is usually designed to dynamically reconfigure tasks. The controller analyzes symptoms by listening to flow jobs in real time, and then modifies different flow job configurations for different symptoms to optimize performance.
This process is divided into three steps: listen, diagnose, and reconfigure.
- Firstly, the controller can monitor flow tasks in real time. At present, the controller of flow operations mainly monitors metrics at the system level, such as CPU Utilization, or metrics at the application level, such as end-to-end data processing delay and throughput backlog, to conduct modeling analysis and policy judgment.
- The controller then diagnoses symptoms through control policies, which can diagnose problems through predefined rules, such as performing scaling out when CPU utilization is above a certain threshold, or model analysis, such as predicting the resource allocation that needs to be achieved.
- Finally, the controller selects different types of reconfiguration methods to optimize the flow job dynamically.
To reduce the engineering overhead of implementing controllers for different flow jobs, a control platform is required to host flow jobs. The control platform encapsulates metrics and reconfiguration methods and provides an API that allows developers to host streaming jobs by submitting controllers to the control platform once they are deployed. The controller also includes a customized control strategy and can directly use the control platform API to collect and reconfigure metrics, hiding the underlying processing logic of the system and simplifying the design and development of the controller.
Most of the triage processing systems are packaged with mature metrics systems, so the control platform can collect metrics based on the original system API. However, the support of dynamic reconfiguration is still a big challenge.
Two, the challenge: give consideration to universality, efficiency and ease of use
The control platform for dynamic reconfiguration should have three properties:
- Universality. Different types of control policies require different types of reconfiguration methods.
- High efficiency, reconfiguration execution should be completed in a short time, and try not to block the original data processing;
- Ease of use, the API should be easy to use, users do not need to know the underlying system logic.
However, the existing solutions can only meet some of the above properties. For example, Flink supports dynamic reconfiguration of flow jobs and provides an easy-to-use online interface to realize dynamic reconfiguration of controller flow jobs for users. By modifying the source code and reinventing the communication job, Flink’s native support has strong universality and applicability. However, redeployment can also be costly. Such as resource reallocation and global state restoration.
The Flink reconfiguration process is as follows: Call it “Snapshots” and call it “snapshots”. The JobManager will call a Savepoint to the pipeline and call it “snapshots”. After receiving all snapshots, JobManager terminates the current pipeline, redeploys the flow job with the new configuration and restarts from the current Savepoint recovery state.
3. Design: Task-centered system design
To address the three properties of reconfiguration, we will introduce Trisk: task-centric flow job control platform.
Above is a Trisk system architecture, it supports the reconfiguration of convection process are defined and implemented, provides the configuration of the abstract for the center with the Task, the abstract contains the execution of three dimension configuration current flow, and based on the abstract encapsulates the atomic operation, makes the configuration method can be defined through combination atomic operation on the abstract. In order to improve efficiency, Trisk uses a partial pause and recovery technique to perform reconfiguration, as opposed to the purpose and restart mechanism provided by Flink itself, and its encapsulation can further leverage the Flink system’s Checkpoint mechanism for consistency. Trisk also provides an easy-to-use programming API with predefined common reconfiguration apis and encapsulates atomic operations as apis to allow users to customize reconfiguration.
Trisk works as follows:
Trisk Runtime maintains a restful API through which users can submit control logic code. The Trisk Runtime then compiles the code and generates the corresponding control policy, which makes diagnostic and reconfiguration decisions based on metrics of the current flow job. After the control policy detects jitter in the currently running stream job, it reconfigures the stream job by interacting with the Trisk Runtime.
The process is as follows: the control policy first gets a Trisk configuration abstraction from the Trisk Runtime to get the configuration of each task in the current flow job, and then updates the Trisk abstraction with different types of atomic operations based on the diagnostic results. For example, if an input rate increase is identified, the control policy will increase the throughput of the stream job by allocating more resources to deploy new tasks and redistributing the workload between tasks. Finally, the control strategy abstracts the updated Trisk back to the Trisk Runtime, which reconfigures and optimizes the flow job based on the updated configuration.
The Trisk reconfiguration is performed by interacting with the underlying stream system, using a partial pause and resume approach to the workflow, thus avoiding the need to terminate the entire stream job to maintain consistency, and only updating part of the task to reduce time overhead. The whole process can be divided into three steps: prepare-sync-update.
The process is as follows: In the Prepare stage, the system finds out the updated and affected tasks based on the updated Trisk abstraction, and prepares the actual configuration of these tasks after the update. During the Sync phase, to ensure data consistency, you need to synchronize flow jobs globally and suspend the affected tasks. The unaffected tasks can continue to be executed. This synchronization process is achieved through the checkpoint barrier mechanism of Flink. In the Update phase, affected tasks are independently updated and continue to execute after the update is complete.
Trisk’s 3D abstraction is derived from three steps of the flow task:
- The first step: When the stream job is submitted to the stream system, it is encapsulated into a Logical Graph that contains the execution logic of the stream task. The vertex operators contain User Defined Function, and the edges represent the intermediate data flows between operators. Each operator uses a UDF to process the incoming data stream and generate an output stream that flows to subsequent operators.
- Step 2: Each operator of the Logical Graph will run a certain number of Stream tasks in parallel, and the input data stream will be allocated to different Stream tasks for parallel execution. The input data stream to which each Stream task is assigned is called the workload configuration for that task.
- Third, these parallel Stream tasks are deployed to the server for physical execution. Each Stream task is allocated a certain amount of resources, such as CPU and memory, on a machine. This allocation describes the resource configuration of the Stream task.
Thus, Trisk’s 3D abstraction consists of task-centric execution Logic, workload, and resources configurations, resulting in a directed acyclic graph stored in the Trisk Runtime.
Our updates to each dimension in the abstraction encapsulate the atomic operation, and by performing the atomic operation on each dimension in the three-dimensional abstraction, the reconfiguration flow job can be fine-grained, thus satisfying the universality of reconfiguration. For example, scaling can be achieved by allocating resources to reconfigure new tasks and redistributing workload between parallel tasks.
The figure above shows a scaling out example where task2’s load increases and latency increases due to an uneven increase in input rate, and Task3’s utilization is also high. Therefore, we need to allocate a new execution task task5 by performing scaling out and transfer part of Task2’s workload to Task5 so that the current stream job can continue to process input stream data with low latency and high throughput.
Trisk provides a common reconfiguration API corresponding to the three reconfiguration methods mentioned earlier: Scaling, Load Balancing, and Change of Logic, which users can use to implement control policies on Trisk. These control policies can be compiled into threads running on the Trisk Runtime to dynamically manage flow jobs.
The example above shows an implementation of a load Balance control policy that can implement dynamic load balancing in a flow job. It implements load balancing by detecting task workloads per second, such as monitoring the distribution of processing data for each task, and redistributing task workloads when the distribution between tasks changes.
Users can also define new reconfiguration methods through atomic operations based on 3d abstractions. We package three kinds of atomic operations into three apis assignLogic, assignWorkload and assignedResource.
The figure above shows the Scaling reconfiguration method based on code that performs atomic operations on abstractions. AssignResource is used to allocate resources to newly created tasks, and assignWorkload is used to redistribute workloads between parallel tasks.
Implementation: Barrier mechanism based on Flink
The Trisk control platform is a back-end service that runs separately and encapsulates the reconfiguration API. New components have also been added to the Flink system layer to interact with the Trisk Runtime and perform efficient reconfiguration of flow jobs. In the Runtime layer, controllers hold user-defined control policies and reconfiguration methods. StreamManager is the heart of Trisk, providing the user with an API and maintaining a Web service to receive new Controllers. At the system level, JobReconfigCoordinator maintains the mapping of Trisk abstractions to Flink’s physical configuration and coordinates the reconfiguration to ensure data consistency for stream jobs before and after reconfiguration.
Each StreamTask maintains a TaskConfigManager, which manages and updates the configuration in the corresponding StreamTask for reconfiguration.
The internal component architecture of Flink is shown above. JobReconfigCoordinator exists in Flink’s JobManager and maintains a TaskConfigManager on each StreamTask. JobReconfigCoordinator and TaskConfigManager can interact remotely through the Flink network layer to implement control logic.
The figure above shows an overview of reconfiguration execution on Flink.
In the prepare phase, the Coordinator receives the abstractions analyzed by the Trisk Runtime layer and prepares the StreamTask for a new configuration. For example, the scaling task was allocated by obtaining a new resource slot, and the workload redistribution was achieved by updating the result partition of the upstream task and the downstream task input gate. For tasks that were stateful, workload reassignment also needed to update Task State Backend.
In the Synchronize phase, Coordinator uses the original checkpoint barrier mechanism of Flink to synchronize and suspend affected tasks to ensure data consistency. This is done by sending a barrier to the entire pipeline starting with the Source Task. After receiving the barrier, the affected StreamTask pauses and waits for an update from the Coordinator.
After the synchronization is complete, the Coordinator enters the Update phase. The Coordinator notifies all affected tasks to update their own configurations in parallel. StreamTask automatically resumes execution after updating its configuration and reconnects upstream and downstream.
Specific implementation details are as follows:
First, the interior configuration of Trisk abstraction was mapped to Flink’s JobGraph and ExecutionGraph. In the prepare phase, coordinators update the corresponding JobGraph and ExecutionGraph, and then use the Barrier mechanism of Flink to implement synchronization during reconfiguration to ensure data consistency.
Second, the atomic operations of each task dynamically modify StreamTask as much as possible using Flink’s native mechanisms. For example, assignWorkload is implemented by reinitializing a state Backend and reupdating the result partition of the upstream task and the input gate of the current task.
The reconfiguration process is as follows:
First, during the prepare phase, JobReconfigCoordinator updates JobGraph and ExecutionGraph. Affected StreamTasks are then marked based on updates. After preparing, the Coordinator uses the barrier mechanism to synchronize the entire pipeline and sends it from the Source task to the entire pipeline through inject barrier. After receiving all barriers from upstream tasks, the affected task suspends, ack the task to a Coordinator, and sends barriers to downstream tasks. Downstream tasks perform similar operations after receiving the barrier. The affected task pauses and ack, while the unaffected task continues to execute. After all tasks are ack to coordinators, synchronization ends.
The Coordinator then enters the Update phase. During the Update phase, the Coordinator informs TaskConfigManager to update the StreamTask configuration, reconnects to the upstream and downstream, and continues execution.
At this point, the reconfiguration process ends.
V. Evaluation: Performance comparison between Trisk and existing systems
We conducted a small-scale experiment with the following two objectives:
-
First, how is the overall effect of the controller achieved on Trisk? Can it meet the optimization objectives of the controller such as delay control?
-
Second, how efficient is Trisk compared to existing reconfiguration execution technologies such as Flink native support and the cutting-edge Megaphone mechanism?
The environment is as follows: We implemented Trisk on Flink-1.10.0 and configured the Flink standalone Cluster on four nodes with eight slots per node. We use a real application, Stock-Exchange, and a composite application, Word-count. Stock-exchange is a real-time stock trading task that needs to process stock trading orders in real time to avoid impacting the user’s trading decisions. Word-count is an operation commonly used in data analysis. We count each key in the input stream.
We implemented a simple but representative latency-aware controller on stock-exchange. The initial stock-Exchange job deploys 10 tasks, and the input stream is stock reporting orders, as shown in the input curve on the left. The controller can control job latency by using scaling and Load balancing to make decisions based primarily on input rates and workloads. For example, at the 100th second, you do the scaling out because the input rate is increasing; At 400 seconds, the scaling in will be done because the input rate decreases.
The controllers implemented on Trisk and Flink each require about 100 lines of code, mainly containing the logic of the control strategy.
The experimental results are shown on the right. In order to demonstrate the optimization effect of the controller, we mainly compared the time-delay variation of the stock exchange job with Trisk/Flink’s native support/static configuration. The red line is the statically configured stock-exchange job, the green line is the optimization effect of the controller flow job on Flink, and the blue line is the optimization effect of Trisk on stock-Exchange.
The red line results show that while the static configuration works fine at the beginning, it is unable to process data after 100 seconds in real time because of the increased input rate, resulting in an increase of two orders of magnitude in latency. By contrast, controllers implemented using Flink’s native configuration can adapt to workload changes, but can result in high latency spikes during reconfiguration, which are roughly one or two orders of magnitude higher than usual. The controller making the decision on Trisk shows the reconfiguration complete time in milliseconds, with negligible delay increments. This is largely due to Trisk’s partial pause and recovery technology.
The performance of Trisk reconfiguration during execution is compared with two existing methods: Flink’s stop and restart mechanism, and Megaphone’s Fluid State migration mechanism, which can synchronize and update the reconfiguration at the key level. In the experiment, load Balancing is used for word-count with an initial configuration of 20 tasks and is triggered at 50 seconds. The entire process redistributes workload among all parallel tasks.
To understand their behavior, we compared the latency and throughput when reconfiguring.
As you can see from the delay graph, Trisk has lower latency than Flink reconfiguration, while Trisk has the shortest completion time compared to Megaphone, but has a relatively high peak latency. As you can see from the throughput graph, Trisk’s throughput dropped during the reconfiguration, but it recovered faster than Flink. For Megaphone, fluid State Migration takes longer to complete reconfiguration, but has lower peak latency and higher throughput during the reconfiguration phase.
In summary, we propose Trisk: task-centric control platform that can support reconfiguration methods in a pervasive, efficient and easy-to-use manner. In future work, we will also continue to explore the implementation of multiple control strategies on Trisk to better leverage the reconfiguration approach on Trisk.
FFA 2021 Live Playback & Presentation PDF download