This article has participated in the activity of “New person creation Ceremony”, and started the road of digging gold creation together.

I don’t know why, lately I’ve been going downhill…

1 The beginning of the story

Now I looked up at the fellow sitting opposite me: a plaid shirt, of medium height, with a pair of black-rimmed glasses under the bridge of a slightly high nose. His eyes were slightly narrowed and tired, and he was staring at me unblinking.

I thought to myself: What do I have to look at? It’s just something you trade for bread and A car. Although accompanied by five years of time, it seems to be so ~

Speaking of which, I forgot to introduce myself. My name is Flink, but OF course I prefer to be called Apache Flink by my full name, because it sounds very technical. I am currently one of the most popular real-time computing engines for big data.

The reason I can say this is because I am currently leading the field in real-time, as you can see from the following statistics:

Here I need to @ my big brother: Apache Spark. I heard there was a debate about whether Spark is becoming a chicken with the advent of Flink. I dare not say, dare not ask, respect and rational for seniors.

“Cough “~ A cough brought me back to reality, A gentleman began to debug the code ~

I’m starting to get stressed

I actually met Jun A again last week, after hearing that he was staying with my good friend Kafka for A week and seemed to be preparing for something big.

When he came to me, he found out that the company was going to build a real-time data warehouse. It requires me and the Kafka brothers to work with gigabytes of real-time data.

I know a lot about real-time data warehousing. Then I looked at the architecture plan put forward by Mr. A’s boss. I thought to myself, “This is my professional field.”

The overall architecture is not difficult and easy to understand.

  • The program obtains source data in real time and places kafka ODS layer storage
  • Ods -> DWD -> DWS layer real-time processing calculation, the results are written into Kafka
  • Add an offline process as a backup

I looked at the Kafka brothers nearby and nodded to each other. Let’s get started

As an old partner, Kafka brothers and I worked very well together, and Mr. A was an old hand, so we did A great job in the first week.

I can show you some of our results:

- src.main.scala.com.xxproject.xx
  |--handler
    |---FlinkODSHandler.scala
    |---FlinkDWHandler.scala
    |---FlinkADSHandler.scala
    ...
  |--service
    |---KafkaSchdulerService.scala
    |---SchdulerService.scala
    ...
  |--config/util/model
    |---KafkaUtils.scala
    |---XXDataModel.scala
    ...
Copy the code

Spring breeze complacent horseshoe disease ~ at the moment the mood is very comfortable, we three is the perfect partner.

But the good times did not last. By the second week, I gradually found myself slowing down

Specific performance is as follows:

  1. The run starts out fine, and then comes up with a lot of tasksWaiting for the
  2. A small number of tasks are reportedcheckpointTimeout problems
  3. Kafka data pile up and cannot be consumed

I panicked and went to check my condition, only to be shocked:

The buffer memory is full for both input and output. Data cannot be processed, barrier flow is very slow, and the generation time of a large number of checkpoint becomes long.

I have a back pressure problem!!

3. My counter-pressure mechanism

After a period of self-regulation in silence, the problem is still unresolved.

At the same time, my surroundings constantly sound alarm, memory frequently run out of emergency. Suddenly my Task execution page was filled with the red High symbol ~

Finally, I sent an alarm to Mr. A

Mr. A received the message, stared at me for A long time, and sighed. I feel a little embarrassed and I feel like I messed up.

He didn’t say much, just asked me about the backpressure mechanism, said to solve the problem at the source.

The following is A conversation between Mr. A and me

1) What are the general situations of backpressure?

According to my previous experience, the general occurrence of backpressure is the downstream data processing speed can not keep up with the upstream data generation speed.

Two cases can be subdivided:

  • The current TaskThe task processing speed is slow. For example, complex logic such as algorithm processing is invoked in task tasks. As a result, the upstream cannot obtain sufficient memory.
  • Downstream of the TaskThe task processing speed is slow, for example, the output of collect() is sent to the downstream multiple times. As a result, the current node cannot apply for sufficient memory.

2) What is the effect of frequent backpressure?

Frequent backpressure increases data delay in stream processing and affects Checkpoint.

Barrier alignment is required when a Task is Checkpoint. If reverse pressure occurs to a Task, the Barrier flow rate decreases, causing the Checkpoint to slow down or even time out, and the Task as a whole to slow down.

Long-term or frequent backpressure needs to be dealt with. Occasional backpressure due to network fluctuation or GC need not be dealt with.

3) How did you find the back pressure?

On my Web interface, I will reverse the Task investigation from Sink to Source. Check the BackPressure details one by one to find the first Task with BackPressure.

This is the normal situation

My internal inspection principle

The BackPressure interface periodically samples the stack information of Task threads, counts the blocking frequency of threads requesting the memory Buffer, and determines whether nodes are in the BackPressure state.

  • By default, the frequency is less than0.1According to the normal
  • (0.1, 0.5)LOW, slight back pressure
  • More than0.5As the HIGH,Need to pay attention to the back pressure

At this time, I showed Mr. A the BackPressure page of the current project, which was obviously abnormal.

4) What is the principle of backpressure mechanism?

Mr. A paused, prompting me to be more careful here. I organized my thoughts and decided to start with limiting the current:

  • The data flow

The whole process can be likened to the producer -> consumer system. The upstream producer sends data (2M/s) to the Send Buffer, and the downstream Consumer consumes data (<1M/s) through the network (5M/s) to the Receive Buffer.

This is obviously not possible, the downstream speed is slower than the upstream speed, data long accumulation into disease ~ need to do flow limiting.

  • Current limiting

That makes sense. Since the upstream process is faster, then I add a limiting mechanism to reduce its speed, so that the upstream and downstream speed is basically the same, so it is not solved.

Well, it’s not. Here are a few questions:

  1. I can’t predict the actual downstream speed in advance (how much the flow limit is set)
  2. Often encountered network fluctuations and other circumstances, upstream and downstream flow rate isDynamic changethe

Considering these reasons, my internal provides a powerful counter-pressure mechanism:

Upstream and downstream dynamic feedback, if the downstream speed is slow, the upstream speed limit; Otherwise, speed up upstream. Realize the effect of dynamic automatic backpressure.

  • Schematic of backpressure mechanism

The upstream sends Network data through its own Network Buffer layer and then down to the Channel Buffer layer (Netty Channel). Finally through the network transmission, layer upon layer transmission to reach the downstream.

Network buffers, Channel buffers, and Socket buffers are commonly known as the differences between user-mode and kernel-mode, which reside in different swap Spaces and operating systems.

The kernel state and user state principle, interested friends welcome to add personal wechat: Youlong525 for discussion ~

  • Backpressure mechanism principle

Here I summarize the operation process of my backpressure mechanism for Mr. A:

  1. eachTaskManagerMaintain a SharedNetwork BufferPool(Task shared memory pool), initialization time directionOff-heap MemoryTo apply for memory.
  2. Each Task creates its ownLocal BufferPool(Task local memory pool) and swap memory with Network BufferPool.
  3. The upstreamRecord WriterApply to the Local BufferPool for buffer write data. If the Local BufferPool does not have enough memory, the Local BufferPool is sent toNetwork BufferPoolApply, after using the memory will be returnedPool.
  4. Netty BufferCopy buffer and pass through itSocket BufferThe device sends the packet to the network, and the downstream device processes the packet according to a similar mechanism.
  5. If the downstream fails to apply for a buffer, it indicates that the current nodememoryIf not, send it layer by layerBack pressure signalTo upstream, upstream slowly stops sending data until downstream resumes again.

Therefore, my backpressure mechanism is similar to the blocking queue in Java, as shown in the figure below.

Task Works with the Local BufferPool and Network BufferPool to apply for and release memory, and the downstream memory usage is reported to the upstream in real time.

After listening to my answer, A was lost in thought

I want to decompress

In fact, I am also confused. I am very confident in my backpressure mechanism, could there be other reasons affecting the backpressure treatment?

At this point, Mr. A opened my WEB UI and mumbled out A few words: data skew and concurrency.

4.1 First attempt

I suddenly understood, turned to the screen.

I checked the state size of each SubTask, and found that there were some exceptions in the corresponding state size of one Checkpoint, which reached about 10G!!

Look at the other values in the partition (see figure):

Data skew occurs ~

I immediately found out these special keys together with Mr. A, pre-aggregated and split the data, and ran them again.

It feels like it works a little bit, but it still has a lot of peaks.

4.2 Second attempt

Now there is an impasse.

I had to increase my memory a little bit. On second thought, it also increases the concurrency of the operator. After all, increasing the number of threads will relieve some calculation pressure.

After unwillingly adjusting the parameters, the result still did not improve much.

4.3 Third attempt

Mr. A started reworking my overall calculation process and changed one parameter.

I looked at it and changed the concurrency. I didn’t think so. I just tried this.

There seems to be something wrong.

This is what I want!! “I could not help Shouting.

He smiled and told me it was using my operator chain mechanism.

Operator chain Can automatically form operator chain by setting the same concurrency between downstream operator and upstream operator

The benefits of this are:

  • Effectively reduce thread switching and data caching overhead
  • Improved throughput and reduced latency

Multiple operator chains are formed in the whole process, resulting in lower thread overhead and memory utilization. Naturally, my backpressure condition also became relieved.

I couldn’t help but be shocked

All the changan flowers in one day

Finally, with the help of Mr. A, my speed came back. A few days of high pressure days completely over, now as silky ~

I slowly exhaled, a little relieved to see the final result:

Unconsciously looked up at Mr. A, he also showed A long time smile.

I’m Flink. No pressure now

In this paper, to the end.

More good articles, please pay attention to my public number: big data Arsenal