There are many delayed operations in Kafka. For example, time-consuming network requests (such as waiting for ISR copy to be successfully replicated while producing) are encapsulated as DelayOperation to delay processing operations, preventing blocking the Kafka request processing thread.
Kafka does not use the JDK’s built-in Timer and DelayQueue implementations. The time complexity of these two insert and delete operations is O(logn), which cannot meet the performance requirements of Kafka.
Cold knowledge: JDK Timer and DelayQueue are both underlying priority queues, that is, minHeap data structure, the fastest task to be executed in the queue first, the difference is that there is a thread in the Timer to pull the task execution, DelayQueue is actually a container. Need to work with other threads. ScheduledThreadPoolExecutor timing task for the JDK is a way, also is actually DelayQueue + pool threads of an implementation.
Kafka implements delay operation based on time wheel. The time complexity of the time wheel algorithm is O(1), which meets the performance requirements of Kafka. In addition to Kafka, open source projects like Netty, ZooKeepr, and Dubbo all have implementations that use the time wheel.
So what is the time wheel algorithm, what is the algorithm idea? How is it implemented in Kafka?
Kafka time wheel algorithm
The algorithmic idea of the time wheel can be understood by the clock in our daily life.
The TimingWheel in Kafka is a circular queue that stores scheduled tasks. The underlying implementation is an array. Each element in the array can store a TimerTaskList. The TimerTaskList is a circular, bidirectional linked list. Each item in the linked list represents a TimerTaskEntry, which encapsulates a real TimerTask.
img
Several parameters in the diagram:
-
TickMs: Time span
-
WheelSize: Number of buckets in a time wheel
-
StartMs: indicates the start time
-
Interval: The overall time span of the time wheel = tickMs * wheelSize
-
CurrentTime: an integer multiple of tickMs, representing the currentTime of the time wheel
-
CurrentTime can be used to divide the entire time round into expired and unexpired parts. CurrentTime refers to a time frame that is expired and needs to process all tasks in the TimerTaskList corresponding to this time frame
The overall span of the entire time wheel is unchanged. As the pointer currentTime continues to advance, the time period that the currentTime wheel can process also moves backward. The overall time range is between currentTime and currentTime+interval.
Now you might be wondering, how does this abstract currentTime move forward
So how do you support long-span timed tasks?
If you want to support timed tasks of hundreds of thousands of milliseconds, why not expand the array of time wheels? There are actually two solutions:
-
Use the concept of increased rounds/laps (Netty’s HashedWheelTimer)
-
For example, the current “0-7” 8 slots, 41%8 + 1 = 2, that is, should be placed in slot 2, subscript 1 position. And then 41 minus 1 over 8 is 5, so the number of rounds is 5. That is, after 5 rounds of scanning the 1 slot will trigger the task.
-
The implementation details are not detailed here
-
The concept of using multiple time wheels (Kafka’s TimingWheel)
-
Compared with the previous scheme, hierarchical time wheel can better control the time granularity, and can deal with more complex scheduled task processing scenarios, and is applicable to a wider range.
The multi-layered time wheel is more like our concept of a clock. A round of the second hand, a round of the minute hand, and a round of the hour hand form a relationship of multiple time wheels.
The NTH time cycle is equal to N+1 time cycle. That is, the time span of the higher level time wheel is equal to the overall span of the current time wheel.
During task insertion, if the first layer time wheel does not meet the conditions, try to insert the time wheel of the higher layer, and so on.
As time goes on, there will also be a time wheel degradation operation. The tasks with a long delay will be re-submitted to the time wheel from the higher time wheel, and then placed in the appropriate lower time wheel for processing.
How do time wheels relate to each other in Kafka, and how do you represent this higher level of time wheel relationship?
It is simply a pointer to an internal object, pointing to a time wheel object on its own level.
There is also the question of how to push the time wheel forward, to make time go forward.
-
The time wheel in Netty is pushed by the worker thread at a fixed interval, tickDuration
-
If there is no task due for a long time, this scheme will bring the problem of empty propulsion, resulting in a certain performance loss;
-
Kafka advances through a DelayQueue, an idea of space for time;
-
The DelayQueue holds all TimerTaskList objects, sorted by time, so that tasks with shorter delays take precedence.
-
The external thread (ExpiredOperationReaper) retrieves the TimerTaskList of timed out tasks from the DelayQueue and advances the time wheel based on the TimerTaskList’s expiration date. There will be no problem with air propulsion.
Kafka is a tradeoff, using DelayQueue in the right places. DelayQueue stores only the TimerTaskList, not all timerTasks. There are not many timerTasks in DelayQueue, which has more advantages than disadvantages compared to empty push.
conclusion
-
Kafka uses the time wheel to realize the delay queue, because its bottom layer is the task add and delete is based on the linked list implementation, is O(1) time complexity, meet the requirements of high performance;
-
For long time delay tasks, Kafka introduces hierarchical time wheels to better control the granularity of time and cope with more complex timed task processing scenarios.
-
Kafka uses the concept of space for time and DelayQueue to advance the time wheel, which is a classic trade off.
Source: juejin. Cn/post / 704740…
My wechat official account: Java Architect advanced programming focus on sharing Java technology dry goods, looking forward to your attention!