Abstract: This paper mainly shares Flink’s CheckPoint mechanism, backvoltage mechanism and Flink’s memory model. Familiarity with these three parts is a prerequisite for tuning. This article is mainly shared from the following parts:

  1. The principle of analyzing
  2. Positioning performance
  3. Classic scene tuning
  4. Memory tuning

Checkpoint mechanism

1. What is checkpoint

To put it simply, Flink regularly persists state in order to achieve fault-tolerance and exact-once semantics. This persistence process is called checkpoint, which is a snapshot of the global status of Flink Job at a certain moment.

When we want to realize a function of global state retention for the distributed system, the traditional scheme will introduce a unified clock which is broadcast to every slaves node through the master node in the distributed system. When the nodes receive the unified clock, they will record their current state.

However, there are also some problems in the way of unified clock, the GC time of a node is relatively long, or the network of Master and Slaves fluctuates at that time, which causes the clock transmission delay or transmission failure. This results in data inconsistencies between the slave and other machines, resulting in brain split. If we want to solve this problem, we need to make a High Availability (HA) for master and Slaves. However, the more complex a system is, the more unstable and costly it is to maintain.

Flink puts checkpoint points into a stream called Barrier.

In the example above, a Barrier triggers the Save snapshot function every time the Task passes through the blue Barrier from the first Task upstream to the last Task downstream. Let’s use an example to illustrate this briefly.

2. Case analysis

This is a simple ETL process where we first take the data from Kafka for a trans conversion operation, and then send it to a downstream Kafka

There is no chaining in this example at this time. Therefore, a forward strategy is adopted, that is, “the output of a task is sent to only one task as input”. This approach also has the advantage of avoiding unnecessary network overhead if both tasks are in the same JVM

Set Parallism to 2 and the DAG is displayed:

■ CK analysis process

Each Flink job has a JobManager, which in turn has a checkpoint coordinator to manage the checkpoint process. A checkpoint coordinator can set an interval for sending a checkpoint event to the source task (task1 and task2 in the parallel diagram) in each Container.

When a Source operator receives a Barrier, it will pause its data processing and then make a snapshot of its current state and save it to the specified persistent store. Finally, an Acknowledge character (ACK) is sent asynchronously to the CheckpointCoordinator, and the Barrier is broadcast to all downstream operators of the CheckpointCoordinator to resume its data processing.

Each operator continuously creates snapshot according to the above and broadcasts to the downstream until the Barrier passes to the sink operator, at which point the snapshot is completed. In this case, it should be noted that upstream operators may be multiple data sources, and all barriers need to be completed before checkpoint is triggered at one time. Therefore, a long checkpoint time may be caused by the long time required for data alignment.

S the Snapshot and Recover

This is the initialization phase of our Container. E1 and E2 are data that has just been consumed from Kafka. At the same time, the CheckpointCoordinator has sent barriers to it.

At this point, Task1 has completed its checkpoint process, recording its Barrier at 2 (e1, e2) and broadcasting its Barrier downstream. Task3’s input is the output of Task1. Task3’s checkpoint effect is 2 (e1 and E2 are the data from Task1), and then the Barrier is broadcast down. When the Barrier is passed to the sink operator, Snapshot is finished.

In this case, data is continuously generated in the Source and new checkpoint points are generated. However, if the Container is restarted, data recovery is required. If offset is 2 and count is 2, we will restore the checkpoint using this state. At this point Task1 will start consuming from e3, which is the Recover operation.

■ What to watch out for at Checkpoint

The following three points will affect system throughput and need to be paid attention to during actual development:

3. Generation of back pressure and Flink’s back pressure treatment

In distributed systems, it is common for multiple tasks and JVMS to exchange data, and we use producers and consumers to illustrate this.

Assuming that my current Producer uses unbounded buffer for storage, when the production speed of the Producer is much faster than the consumption speed of the consumer, the data at the Producer end will be overstocked due to the low consumption capacity of the consumer end, which ultimately leads to OOM.

Even if the bounded buffer is used, the consumption capacity of the consumer side is also low. When the buffer is full, the producer will stop production, which cannot completely solve our problem, so we need to adjust according to different situations.

Flink also uses bounded buffers to exchange data between different TaskManagers. And the method is divided into static flow control and dynamic flow control two ways.

To put it simply, when TPS of producers is more than that of consumers, we adopt overwrite, use Batch to encapsulate our data, and then send them out in batches. After each sending, we sleep for a period of time, the calculation method of which is left (remaining data)/TPS. But this approach is very difficult to predict the system.

Prior to Flink 1.5, flow control was based on TCP’s sliding window implementation, which has been covered in previous lectures. Flink deprecated this mechanism after 1.5, so I won’t expand on it here. In this network model, the data generating node can only decide whether to send data to the consumer by checking whether the current channel is writable, and it does not know the real capacity of the downstream data consumer. As a result, when the generating node discovers that the channel is unwritable, it is possible that the downstream consuming node has a backlog of data.

Credit-based We use the following data exchange example to illustrate:

Flink data exchange can be roughly divided into three types, one is the data exchange of the same Task, the other is the data exchange between different tasks and JVM. The third is the exchange between different tasks and different JVMS.

The data exchange of the same Task is the forward strategy approach we just mentioned, which mainly avoids serialization and network overhead.

The second way of data exchange is through a record Writer, where the data is serialized and then transferred to a Result Partition. The data is then passed through the local channel to the Input Gate of another Task, which is then deserialized and pushed to the Record Reader for operation.

Because the third data exchange involves a different JVM, there is some network overhead. The difference is that the third data exchange is pushed to the Netty first, through which the data is pushed to the remote Task.

S Credit – -based

At this point we can see that Event1 has been pushed to TaskB with a backlog = 1. The backlog is really just for the consumer to be aware of what is going on at our production end

At this time, after Event1 is received by TaskB, TaskB will return an ACK to TaskA and a credit = 3, which tells TaskA how many more pieces of data it can receive. Flink communicates with each other in this way. To make both producers and consumers aware of each other’s state.

After a period of time, TaskB’s bounded buffer is full, TaskB replies credit = 0 to TaskA, the channel will stop working, and TaskA will no longer send data to TaskB.

After a period of time, the bounded Buffer in TaskA has also experienced data backlog, so the problem of throughput decline and processing delay we usually encounter is because the whole system is equivalent to a stagnant state at this time, as shown in Figure 2, all processes are marked with “X”, indicating that these processes have stopped working.

The JVM is a very complex system and when it runs out of memory it causes OOM and crashes the system. Flink will allocate a cutoff reserved memory after getting the memory allocated by us to ensure the security of the system. Netword buffers refer to the bounded buffer we have just mentioned. Momery Manager is a memory pool, and this part of the memory can be set to in-heap or out-of-heap memory. Of course, in streaming operations, we usually set it to out-of-heap memory. The Free part is the memory block provided to the user.

Now let’s assume that the memory allocated to this TaskManager is 8GB.

  1. First we will cut the cutoff. The default is 0.25, so our available memory will be 8Gx0.75
  2. Network Buffers occupy 0.1 of the available memory, so 6144×0.1
  3. In/out of heap memory is the amount of available memory minus Network Buffers and multiplied by 0.8
  4. The memory given to the user is the remaining 0.2 of the heap memory

What is really happening is that Flink knows the size of the heap and then inverts the size of the other memory.

Flink job problem location

1. Problem locating formula

“One pressure two check three indicators, delayed throughput is the core. Always pay attention to the amount of resources, check GC first.”

The first pressure refers to the back pressure, and the second check refers to checkpoint, whether it takes a long time to align data, and whether the state is large, which are closely related to system throughput. The third indicator refers to the display of Flink UI. Our main focus is actually delay and throughput. System resources, and GC logs.

  • Look at backpressure: Usually the downstream of the last subTask to be pushed is one of the bottlenecks of the job.
  • Check the Checkpoint duration: The Checkpoint duration affects job throughput to a certain extent.
  • Look at the core indicators: Indicators are the basis for accurately judging the performance of a task, among which delay indicators and throughput are the most critical indicators.
  • Resource utilization: Improving resource utilization is the ultimate goal.

■ Common performance problems

A quick explanation:

  1. When focusing on back pressure, people often overlook the performance problems caused by data serialization and deserialization.
  2. Some data structures, such as HashMap and HashSet, require hash computations for their keys. When a large amount of data is used, keyby operations have a significant impact on performance.
  3. Data skew is our classic problem, which we’ll expand on later.
  4. If the downstream is MySQL or HBase, we will carry out a batch processing operation, that is, the data is stored in a buffer and sent when certain conditions are met. The purpose of this operation is to reduce the interaction with external systems and reduce the cost of network overhead.
  5. Frequent GC, whether CMS or G1, stops the entire job during GC, and causes JobManager and TaskManager to fail to send heartbeats on time. The JobManager will assume that the TaskManager is out of contact and it will start a new TaskManager
  6. A window is a means of slicing unlimited data into finite chunks. For example, we know the data overlap problem when using sliding Windows. Although size = 5min does not belong to the category of large Windows, step = 1s means that data will be processed once every second, which will cause the problem of high data overlap and large data volume.

2.Flink job optimization

We can use some data structures, such as Set or Map, to combine Flink state for deduplication. However, these de-redo schemes can lead to dramatic performance degradation as the data volume increases, such as write performance problems caused by hash collisions, GC problems caused by excessive memory, and TaskManger disconnection problems we just analyzed.

Plan 2 and Plan 3 are also carried out by means of some data structures. Students who are interested can go down to understand them by themselves. They will not be expanded here.

■ Data skew

Data skew is a high frequency problem that everyone will encounter, and there are many solutions.

The first scenario is when our concurrency setting is lower than the number of zones, resulting in uneven consumption as described above.

In the second case, if the keys are not evenly distributed, you can add random prefixes to break up the distribution so that the data is not concentrated in several tasks.

The same key is aggregated locally on each node, similar to the local combiner in MapReduce. After map-side pre-aggregation, there is only one local key for each node because multiple identical keys are aggregated. When other nodes pull the same key from all nodes, the amount of data that needs to be pulled is greatly reduced, thus reducing disk I/O and network transmission overhead.

■ Memory tuning

We have just mentioned the memory structure of Flink, so we know that the tuning aspect is mainly for non-heap memory Network buffer, manager pool and heap memory, which are basically controlled by parameters.

We need to adjust these parameters according to our own situation. Here are only some suggestions. For ManagerBuffer, Flink’s streaming operations don’t use that much memory right now, so we keep it small, no more than 0.3.

The tuning of heap memory is based on the JVM. The default garbage collector is G1. The Parallel Exploiter is serialized and takes a long time to collect garbage. Online information is also very much, I will not expand the explanation here.

Total knot

This paper introduces Flink’s CheckPoint mechanism, backpressure mechanism, Flink’s memory model and analyzes some tuning strategies based on the memory model. Hope to be helpful to you, the original shared video review can be moved to the link below:

Ververica. Cn/developers /…