Previously we supported MiniBatch in Flink SQL, which played an important role in supporting high-throughput scenarios. One of the major improvements we made in Flink SQL performance optimization this year was an upgrade to the MicroBatch model, which we call MicroBatch, or MiniBatch2.0.
When designing and implementing Flink’s flow operator, we generally use “state-oriented programming” as the first criterion. In stream computing, to ensure State consistency, State data needs to be stored in StateBackend and distributed snapshots are created by the framework. The RocksDB,Niagara state backend currently used, serializes and deserializes every read and write operation, and even disk I/O. Therefore, state-related operations are often a performance bottleneck for the entire task, and the data structure design of the state and each access to the state require special attention.
The core idea of microbatch is to cache a small batch of data. When accessing state state, multiple data with the same key only need to perform state operation once. When the key repetition rate of the data in the batch is large, the access frequency to the state can be significantly reduced, thus greatly improving throughput. The core mechanism of MicroBatch and MiniBatch is the same, which is to accumulate batches and then trigger calculations. It’s just a different batch strategy. Let’s start by explaining how we can save state access times when triggering calculations.
Micro batch calculation
A typical application scenario of MicroBatch is Group Aggregate. For example, a simple summation example:
SELECT key, SUM(value) FROM T GROUP BY keyCopy the code
As shown in the preceding figure, when MicroBatch is disabled, the Aggregate processing mode of Aggregate is that each entry of data is queried once, the Aggregate calculation is performed, and the status is written once. When there are N pieces of data, you need to operate the state 2 times N times.
When MicroBatch is enabled, N cached data are triggered together, and the data with the same key is read and written only once. For example, the four A records cached in the figure above will only be read and written to the state once each. Therefore, when the key repetition rate of the data is larger and the size of the saved batch is larger, there will be less access to the state and the throughput will be higher.
Saving of strategy
The save batch strategy is generally divided into two dimensions, one is delay, one is memory. Delay controls how often a batch is saved, which is also an important trade-off between throughput and delay. Memory is to avoid the TPS is too large to store cached data, avoid Full GC and OOM. The following describes the differences between the old MiniBatch and the new MicroBatch in these two dimensions.
MiniBatch Batch policy
The delay dimension of the MiniBatch strategy is realized by registering an individual timer on each aggregation node, and the time allocation strategy adopts a simple equalization. For example, if there are four Aggregate nodes and the MiniBatch is configured for 10s, 2.5 seconds will be allocated to each node, as shown in the following figure:
But there are several problems with this strategy:
- Users can tolerate a 10s delay, but only 2.5 seconds is really used for batch saving, which is inefficient. The more complex the topology, the more obvious the difference.
- Because the triggering of the upstream and downstream timers is purely asynchronous, it may cause that when the upstream triggers the microbatch, the downstream also triggers the microbatch, and the network data will not be consumed for a period of time during the microbatch processing, so the upstream is easy to be backloaded.
- Timers introduce additional threads, increasing the overhead of thread scheduling and lock snatching.
The MiniBatch batch saving policy counts the number of input items in memory. When the number of input items exceeds the specified blink.minibatch. size, the batch is triggered to prevent OOM. However, the size parameter is not easy to evaluate. On the one hand, when size is too large, it may lose the function of protecting memory. When the size is too small, the efficiency of batch saving will be reduced.
MicroBatch Batch policy
MicroBatch was proposed to solve the above problems encountered by MiniBatch. MicroBatch introduced watermark to control the timing triggering function of aggregation nodes, and used watermark as a special event to insert data streams into batches with equal time intervals. The implementation principle is as follows:
MicroBatch inserts a MicroBatchAssigner node after the data source to send watermark at a specified interval (for example, 10 seconds). Every 10 seconds, regardless of whether the data source has data, a watermark of the current system timestamp will be sent down. The current watermark of a node is taken from the minimum watermark value of all channels, so when the aggregate node’s watermark value moves forward, it means that a batch upstream has been saved, and we can trigger this batch. After processing this batch, the current watermark needs to be broadcast to all downstream tasks. A batch is also triggered when the downstream task collects the upstream watermark. The trigger of such batches is triggered step by step from upstream to downstream.
One of the interesting things about watermark is that it’s a special event that divides batches. Watermark was a very powerful tool that we used to measure the progress of business time and solve out-of-order business time issues. But in fact, in another dimension, it can also be used to measure the progress of the global system time, so as to solve the problem of data batch very cleverly.
Therefore, compared with the MiniBatch policy, MicroBatch has the following advantages:
- Under the same delay, MicroBatch is more efficient and can save more data.
- Because the batch triggering of MicroBatch is event-based, when the upstream triggers, the downstream does not trigger at the same time, so it is not as easy to cause backpressure as MiniBatch.
- Resolving data jitter (analyzed in the next section)
We compared performance with a DAU job, and with the same allowLatency (6 seconds) configuration, MicroBatch achieved higher throughput and the same end-to-end latency as MiniBatch!
In addition, based on the performance test comparison above, it can be found that the average queue utilization rate of MicroBatch is below 50% after stable operation, while MiniBatch is basically always under queue full load. Note MicroBatch is more stable than MiniBatch and is less likely to cause backpressure.
MicroBatch is still the same as MiniBatch in the memory dimension, using the size parameter to control the number of items. However, the future will be based on memory management, storing cached data in managed memory blocks (BytesHashMap) to reduce the space cost of Java objects, reduce GC stress and prevent OOM.
Preventing data jitter
The so-called data jitter problem refers to that when there are two agGs, the update message sent by the AGG at the first layer is split into two independent messages for downstream consumption, namely retract message and accumulate message. When the second layer AGG consumes these two messages, it also emits two messages. See from the front end is the data will have jitter phenomenon. For example, in the following example, the number of buyers is counted. Here, two layers of breaking are done. UV statistics are done for the first layer, and SUM is done for the second level.
SELECT day, SUM(cnt) total
FROM (
SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
FROM T GROUP BY day, MOD(buy_id, 1024))
GROUP BY dayCopy the code
When the count DISTINCT result at level 1 goes from 100 to 101, it emits two messages of -100 and +101. When SUM at the second level receives these two messages in sequence and processes them, assuming that SUM is 900, then the SUM at -100 will be 800, and then the SUM at +101 will be 901. The feeling on the user side was that the number of buyers went from 900 to 800 to 901, and we called that data jitter. In theory, the number of buyers should only increase, so we have been thinking about how to solve this problem.
The essence of data jitter is that retract and accumulate messages are two operations in a transaction, but the intermediate results of the two operations are seen by the user, which is the weakest READ UNCOMMITTED transaction guarantee in isolation (I) of the traditional database ACID. The fundamental solution is to process retract & accumulate messages atomically. As with the MicroBatch strategy described above, watermark is not inserted in retract & accumulate, so watermark is the natural boundary for transactions. Retract & accumulate is performed atomically by treating the batch according to watermark. To solve the jitter problem.
Application scenario and usage mode
MicroBatch is a policy that uses a certain delay to exchange for a large amount of throughput. If users have requirements for ultra-low latency, it is not recommended to enable MicroBatch. MicroBatch provides significant performance improvements for aggregation and Join of unlimited streams. Therefore, you are advised to enable MicroBatch. If the preceding data jitter problem occurs, you are advised to enable it.
MicroBatch is disabled by default.
This configuration is required when using the microbatch policy. And Suggestions and blink. MiniBatch. Consistent allowLatencyMs blink. MicroBatch. AllowLatencyMs = 5000 # using microBatch need to keep the following two miniBatch configuration Blink. MiniBatch. AllowLatencyMs = 5000 # prevent OOM, blink. The article how much per batch cache data. MiniBatch size = 20000Copy the code
The follow-up to optimize
Currently, MicroBatch supports only aggregation and Join of infinite streams, but does not support Window Aggregate. Therefore, the subsequent Window Aggregate mainly supports the MicroBatch strategy to improve throughput performance. MicroBatch memory, on the other hand, is considered to be managed using binary data structures to improve memory utilization and mitigate GC impacts.