Zhang Youliang (Apache Flink Community Volunteer)

This article contains 4745 words and is expected to take 15 minutes to read.
This article is compiled based on the Live broadcast of Apache Flink series and shared by Teacher Zhang Jun, head of Apache Flink Contributor and OPPO Big data platform. The main contents are as follows:

  • The concept and background of network flow control
  • TCP flow control mechanism
  • Flink TCP-based backvoltage mechanism (before V1.5)
  • Flink Credit-based Backpressure Mechanism (since V1.5)
  • Summary and Reflection

The concept and background of network flow control

Why is network flow control needed





First of all, if we look at the simplest graph of network flow control, the throughput of Producer is 2MB/s and that of Consumer is 1MB/s, then we can see that our Producer is faster than Consumer in network communication. If we have a Buffer at both ends, the Producer end has a Send Buffer for sending, and the Consumer end has a Receive Buffer for receiving, and the throughput on the network end is 2MB/s. After 5s, our Receive Buffer may not be able to hold. At this time, we will face two situations:

  • 1. If the Receive Buffer is bounded, incoming data is discarded.
  • 2. If the Receive Buffer is unbounded, the Receive Buffer will continue to expand, eventually causing the Consumer to run out of memory.

Network flow control implementation: static speed limit





In order to solve this problem, we need network flow control to solve the problem of difference between upstream and downstream speeds. Traditionally, a static flow limiting similar to Rate Limiter can be implemented on the Producer end. The sending Rate of the Producer is 2MB/s, but after the flow limiting layer, Sending data to the Send Buffer is reduced to 1MB/s, so that the sending rate of the Producer can match the processing rate of the Consumer. But there are two limitations to this solution:

  • 1. It is impossible to predict in advance what speeds a Consumer can withstand
  • 2. Consumer affordability usually fluctuates dynamically

The realization of network flow control: dynamic feedback/automatic backvoltage





For the static speed limit problem, we evolved to the dynamic feedback (automatic backpressure) mechanism. We needed the consumers to give feedback to the Producer in a timely manner, that is, to tell the Producer what speed they could tolerate. There are two types of dynamic feedback:

  • 1. Negative feedback: When the receiving rate is less than the sending rate, the Producer is told to reduce the sending rate
  • 2. Positive feedback: Occurs when the sending rate is less than the receiving rate. Producer is told to increase the sending rate
Let’s take a look at some classic cases

Case 1: Storm backpressure implementation





This is the Backpressure mechanism implemented in Storm. You can see that Storm has a Backpressure Thread for each Bolt. When the thread detects a serious block in the Recv queue in Bolt, it writes it to ZooKeeper. ZooKeeper is monitored by Spout and stops sending when it detects backpressure. In this way, the upstream and downstream send and receive rates are matched.

Case 2: Spark Streaming backpressure implementation





Spark Streaming has a similar feedback mechanism, In the figure above, Fecher collects some indicators from nodes such as Buffer and Processing in real time, and then feeds the speed received information back to the Receiver through the Controller to achieve speed matching.

Question: Why is Flink (before V1.5) not implementing the feedback mechanism in a similar way?

First of all, before solving this problem, we need to know what kind of architecture Flink network transmission is.





This figure reflects the basic data flow of Flink during Network transmission. Before sending Network data, the sender has to go through its own internal process. It has its own Network Buffer and uses Netty to communicate at the bottom. The Netty layer also has its own ChannelOutbound Buffer. Because network requests are sent through the Socket, the Socket also has its own Send Buffer and the corresponding three-level Buffer at the receiving end. If you study computer networking, you should know that TCP comes with its own flow control. In fact, Flink (before V1.5) realizes feedback through the flow control mechanism of TCP.

TCP flow control mechanism

The following figure shows a brief review of the format structure of TCP packets. First of all, it has a mechanism such as Sequence number to number each packet, and a mechanism such as ACK number to ensure the reliability of TCP data transmission. Besides, another important part is Window Size. When replying to a message, the receiver uses Window Size to tell the sender how much more data can be sent.





Let’s take a quick look at the process.

TCP flow control: sliding window





TCP flow control is based on the sliding window mechanism, now we have a Socket sender and a Socket receiver, the current rate of our sender is 3 times that of our receiver, so what will happen? Suppose we initially send the window size of 3, and then we have a fixed window size of 5 on the receiving end.





First of all, the sending end will send 3 packets at a time to the receiving end, and the receiving end will put these 3 packets into the Buffer after receiving them.





The receiving end consumes 1 packet at a time, at which time 1 is consumed. Then we see that the sliding window of the receiving end slides forward one space. At this time, 2,3 are still in the Buffer while 4, 5, and 6 are empty, so the receiving end sends ACK = 4 to the sender. This means that the sender can start sending from 4, and the window is set to 3 (Buffer size 5 minus 2 and 3 already saved), and the sender will move its sliding window forward to 4, 5, and 6 upon receiving the response.





At this point, the sender will send 4, 5, and 6, and the receiver can also successfully receive the Buffer.





At this stage, the receiver will consume 2, and his window will slide forward by one. At this point, he will have only one Buffer left, so he will send ACK = 7, window = 1 to the sender. After the sender receives the packets, the sliding window also moves forward, but it cannot move 3 grids at this time. Although the speed of the sender is allowed to send 3 packets, the window has been told that only one can be received, so its sliding window can only move forward one grid to 7, thus achieving the effect of current limiting. The sending speed decreased from 3 to 1.





At this time, the sender sends 7, and the receiver receives it. However, due to the consumption problem of the receiver, the receiver does not fetch it from the Buffer. At this time, the receiver sends ACK = 8 and window = 0 to the sender. The sending end cannot send any data, so the sending speed of the sending end is reduced to 0. At this time the sender does not send any data, the receiver does not carry out any feedback, so how to know the consumer began to consume?





TCP has a ZeroWindowProbe mechanism, the sender will periodically send 1-byte probe messages, at this time, the receiver will feedback the size of the window. After receiving the probe message, the window can be fed back to the sender to restore the whole process. TCP implements feedback through such a sliding window mechanism.

Flink TCP-based backvoltage mechanism (before V1.5)

Example: WindowWordCount





The general logic is to receive data from the Socket, do a WordCount every 5s, submit the code and then compile it.

Compile phase: Generate JobGraph





At this point, the StreamGraph is not submitted to the cluster, and the Client generates the JobGraph, which is the basic unit to submit to the cluster. During the generation of JobGrap, some optimization is performed to merge nodes without Shuffle mechanism. Once you have JobGraph, you commit to the cluster and go to run.

Run phase: Schedule ExecutionGraph





The JobGraph submits to the cluster and generates an ExecutionGraph, at which point you have a basic execution prototype, breaking each task down into different subtasks. The Intermediate Result Partition in the ExecutionGraph above is the module that sends the data to the ExecutionGraph scheduler of the JobManager. Schedule the entire ExecutionGraph. Then we conceptualize such a physical execution diagram, and we can see that each Task receives data through such an InputGate, which can be considered to be responsible for receiving data. After that, there is such a ResultPartition which is responsible for sending data. The ResultPartition is then partitioned to be consistent with downstream tasks, thus forming a mapping relationship between ResultSubPartition and InputChannel. This is the logical level of the network transmission channel, based on this concept we can disentangle the backvoltage problem.

Problem breakdown: Two stages of backpressure propagation





The propagation of backpressure is actually divided into two stages. Corresponding to the above execution figure, we involve a total of three TaskManagers. In each TaskManager, there are corresponding tasks executing and InputGate, which is responsible for receiving data. A ResultPartition that sends data, which is a basic data transfer channel. At this point, suppose that the Task (Sink) at the most downstream has problems and the processing speed slows down, how can the pressure be transmitted back? At this point, there are two cases:

  • Across TaskManagers, how does backpressure propagate from InputGate to ResultPartition
  • How does backpressure propagate from ResultPartition to InputGate in TaskManager

Data transfer across TaskManager





As mentioned earlier, sending data requires a ResultPartition, and within each ResultPartition there will be a partition, a ResultSubPartition, and some Buffer for memory management in between. For a TaskManager, there is a unified Network BufferPool shared by all tasks, and Memory is allocated from off-heap Memory during initialization. Subsequent memory management of allocated memory is performed by synchronizing the Network BufferPool and does not rely on the JVM GC mechanism to free it. Once you have a Network BufferPool, you can create a Local BufferPool for each ResultSubPartition. The Record Writer of TaskManager writes < 1,2 > data into the Record Writer, because the ResultSubPartition is initialized to be empty and there is no Buffer to receive. I’m going to ask the Local BufferPool for memory, and the Local BufferPool doesn’t have enough memory so I’m going to go to the Network BufferPool, Finally, the applied Buffer is returned to the ResultSubPartition according to the original link, and < 1,2 > the two data can be written. The ResultSubPartition’s Buffer is then copied to the Netty’s Buffer and finally to the Socket’s Buffer to send the message. The receiver then consumes the message using a similar mechanism. Next, let’s simulate a mismatch of upstream and downstream processing speeds, with the sending rate at 2 and the receiving rate at 1, and see what the backpressure process looks like.

Reverse pressure process across TaskManager





InputChannel will run out of Buffer after some time because of speed mismatch, so it will apply for a new Buffer from the Local BufferPool. You can see that a Buffer in the Local BufferPool is marked as Used.





The sending end continues to send data at a mismatched speed. As a result, when InputChannel applies for a Buffer from the Local BufferPool, it finds that no Buffer is available. In this case, it has to apply for a Buffer from the Network BufferPool. Of course, each Local BufferPool has the largest available Buffer, so that a Local BufferPool does not exhaust the Network BufferPool. In this case, the Network BufferPool still has available buffers to apply for.





After a period of time, the Network BufferPool does not have any available Buffer, or the maximum available Buffer of the Local BufferPool reaches the upper limit and cannot apply for new data from the Network BufferPool. The Netty AutoRead is disabled and Netty will not read data from the Socket’s Buffer.





Obviously, after a while the Socket’s Buffer will run out, and Window = 0 will be sent to the sender (as mentioned above). The sending Socket stops sending.





Soon the Buffer of the sending Socket was exhausted, and the Netty stopped writing data to the Socket when it detected that the Socket could no longer write.





When Netty stops writing, all data is blocked in Netty’s Buffer, but Netty’s Buffer is unbounded and its upper watermark is controlled by the high watermark in Netty’s watermark mechanism. When the value of the channel exceeds the high watermark, Netty sets its channel as unwritable. The ResultSubPartition checks whether Netty is writable before writing data, and stops writing data to Netty if it finds that the channel is writable.





At this point, all the pressure comes to the ResultSubPartition, and just like the receiver, it will continuously apply for memory from the Local BufferPool and Network BufferPool.





When both the Local BufferPool and the Network BufferPool are exhausted, the entire Operator stops writing data, resulting in cross-taskManager backpressure.

TaskManager Internal backpressure process

TaskManager backpressure causes the local TaskManager subpartition to fail to continue writing data. Therefore, the write of Record Writer is blocked, because the Operator needs input in order to have output after calculation, input and output are executed in the same thread, Record Writer is blocked, The Record Reader also stops reading data from the InputChannel, while the upstream TaskManager continues to send data, eventually depleting the TaskManager’s Buffer. For details, see the following figure, which shows the backpressure process in TaskManager.





Flink Credit-based Backpressure Mechanism (since V1.5)

Disadvantages of TCP-based backvoltage





Before introducing the credit-based backvoltage mechanism, analyze the disadvantages of TCP backvoltage.

  • Multiple tasks may need to be executed in a TaskManager. If the data of multiple tasks needs to be transmitted to the same TaskManager downstream, the same Socket will be used for transmission. In this case, if the single Task generates negative pressure, As a result, the multiplexed sockets are blocked, other tasks cannot be transmitted, and checkpoint barriers cannot be issued. As a result, the downstream checkpoint execution delay increases.
  • Depending on the lowest layer TCP for flow control, the backvoltage propagation path is too long, resulting in a long delay in taking effect.

Credit-based back pressure is introduced

This mechanism can be simply understood as the reverse voltage mechanism similar to TCP flow control is implemented at the Flink level to solve the above drawbacks. Credit can be likened to the Window mechanism of TCP.

Credit-based back pressure process





As shown in the figure, the backpressure mechanism is implemented at the Flink level, that is, every time a ResultSubPartition sends a message to InputChannel, a backlog size is sent to tell the downstream how many messages are to be sent. The downstream calculates how many buffers to receive the message, After the calculation, if there is enough Buffer, a Credit will be returned to the upstream to inform it that it can send messages. (The dashed line between the two ResultSubPartitions and InputChannel is because Netty and Socket will communicate with each other eventually.) Let’s look at a concrete example.





Assume that the upstream and downstream speeds do not match. The upstream send rate is 2 and the downstream receive rate is 1. As can be seen from the figure, two messages, 10 and 11, are accumulated in the ResultSubPartition. The sent data <8,9> is then sent downstream along with the backlog = 2. When the downstream receives it, it will calculate whether there are two buffers to receive it. As you can see, the InputChannel is running out of buffers, so it will apply from Local BufferPool and Network BufferPool. Fortunately, Buffer is still available at this time.





After a period of time, because the upstream sending rate is higher than the downstream receiving rate, the downstream TaskManager’s Buffer reaches the upper limit, and the downstream TaskManager returns Credit = 0. The ResultSubPartition will not transmit data to Netty after receiving the ResultSubPartition, and the upstream TaskManager’s Buffer is quickly exhausted, achieving the effect of backpressure, so that the ResultSubPartition layer can sense the backpressure. There is no need to feed back through Socket and Netty layer by layer, which reduces the delay of backpressure effect. In addition, the TaskManager does not block the Socket, which eliminates the problem that the TaskManager and TaskManager are blocked due to the backpressure of a Task.

Summary and Reflection

conclusion

  • Network flow control is to prevent overload in downstream when upstream and downstream speeds do not match
  • Network flow control has two methods: static speed limit and dynamic backpressure
  • Flink 1.5 was previously based on TCP flow control + bounded buffer to achieve backvoltage
  • After Flink 1.5, the self-managed credit – based flow control mechanism is implemented, which simulates TCP flow control mechanism in the application layer

thinking

With dynamic backpressure, is the static speed limit completely useless?





In fact, dynamic backpressure is not a panacea. The result of our flow calculation will eventually be output to an external Storage. The backpressure may not be triggered when the external data is stored at the Sink terminal, which depends on the implementation of external Storage. Message-oriented middleware such as Kafka, which implements traffic limiting and speed limiting, can feed back the backpressure to the Sink end through the protocol, but ES, which cannot transmit the backpressure back to the Sink end. In this case, in order to prevent external storage from being blown up under a large amount of data, We can use static speed limiting to limit traffic at the Source end. Therefore, dynamic backpressure cannot completely replace static speed limiting, and it is necessary to choose a treatment scheme according to the appropriate scenario.







The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.