1. The introduction
Let’s start with two interview questions. The first question is: If there are 10W scheduled tasks on a machine, how can we effectively trigger them?
Specific scenarios are as follows:
There is an APP real-time message channel system. Each user maintains a TCP connection from APP to server for sending and receiving messages in real time. For this TCP connection, there is the following requirement: If there is no request packet (such as login, message, keepalive packet) for 30 seconds, the server will set the status of the user to offline. Among them, the concurrent online volume of single-machine TCP is about 10W level, keepalive request packets are scattered about 30s, and the throughput is about 3000qps. How to do?
In the common solution, the time periodic task is used to scan the collection of all connections Map
every second and search for connections whose connection time difference (the connection time of each new request) is 30 seconds longer than the current time.
Another scheme uses the ring queue method:
Three important data structures:
Create a ring queue with index from 0 to 30 (essentially an array)
Each slot on the ring is a Set
, a collection of tasks
There is also a Map
that records which slot on the ring the UID falls into
So when a user uid has a request packet arriving:
In which slot is the UID stored
Remove the UID from the slot’s Set structure
Add the UID to the new slot => the previous slot to which the Current Index pointer points, because this slot will be scanned by the timer 30 seconds later
Update the Map. The uid corresponds to the slot index
Which elements will be timed out?
Current Index moves a slot every second. All uids in the Set corresponding to this slot should be collectively timed out! If a request packet arrived in the last 30s, it must be placed in the slot before the Current Index. All elements in the Set corresponding to the Current Index did not arrive in the last 30s.
Therefore, when there is no timeout, there should be no element in the Set of each slot swept by the Current Index.
Comparison of the two schemes:
Scheme 1 polls all data each time, while Scheme 2 uses ring queue to poll only the data that needs to expire at this moment. If there is no data expiration, there is no data to be processed, and it is batch timeout. In addition, since the ring structure saves more space, this is suitable for high-performance scenarios.
Second question: What do I do if I need to execute a task that is delayed for a certain amount of time?
Instead of repeating the wheel, the options are of course delay queues or timers.
The data structure of the smallest heap represented by array is used to realize the delay queue and the addition of delayed tasks in Timer. The time complexity of adding new elements and removing the first element in the queue is O(nlog(n)).
2. Time round
The ring queue used in Scheme 2 is the underlying data structure of the time wheel. It allows the data to be processed (the abstraction of the task) to be concentrated. In Kafka, there are a lot of delayed operations, such as delayed production, delayed pull and delayed deletion. Kafka does not use the JDK’s built-in Timer or DelayQueue to implement the delay function. Kafka uses a custom Timer based on the time wheel to implement the delay function. The average time complexity of JDK Timer and DelayQueue insert and delete operations is O(nlog(n))O(nlog(n))O(nlog(n)), which cannot meet the performance requirements of Kafka. Based on the time wheel, the time complexity of both insert and delete operations can be reduced to O(1). The time wheel is not unique to Kafka. It can be used in many different ways, including Netty, Akka, Quartz, and Zookeeper.
2.1 Data structure of time wheel
The TimingWheel in Kafka is a circular queue that stores scheduled tasks. The underlying implementation is an array. Each element in the array can store a TimerTaskList. The TimerTaskList is a circular bidirectional linked list. Each item in the linked list represents the TimerTaskEntry, which encapsulates the real TimerTask. The TimeTaskList is represented by buckets in The Kafka source code, so the TimerTaskList will probably be called buckets as well.
A quick explanation of some of the nouns above:
TickMs: A time wheel consists of multiple time cells. Each time cell is tickMs, which represents the basic time span of the current time wheel.
WheelSize: represents the number of time wheels in each layer
Interval: indicates the total time span of the current time wheel, interval=tickMs x wheelSize
StartMs: Round the structure when the time of the current time, first time round startMs is TimeUnit NANOSECONDS. ToMillis (NANOSECONDS ()), the upper time round startMs currentTime for lower time round.
CurrentTime: CurrentTime =startMs – (startMs % tickMs); This is analogous to a clock with a 65 second minute that still points to one minute. CurrentTime can be used to divide the entire time round into expired and unexpired parts. CurrentTime refers to the time frame that is expired and needs to process all tasks of the TimerTaskList corresponding to this time frame.
2.2 Task storage in the time wheel
If tickMs of the time wheel =1ms and wheelSize=20, the interval can be calculated as 20ms. In the initial case, the dial pointer currentTime points to lattice 0, and a task with a timing of 2ms is inserted into the TimerTaskList with a timing of 2. As time goes on, the pointer currentTime keeps moving forward. After 2ms, when time grid 2 is reached, the task in TimeTaskList corresponding to time grid 2 needs to perform the corresponding expiration operation. If another 8ms timed task is inserted, it will be stored in grid 10. CurrentTime will point to grid 10 after 8ms. What if a timed 19ms task is inserted at the same time? The new TimerTaskEntry will reuse the original TimerTaskList, so it will be inserted into the expired TimerTaskList 1. In short, the overall span of the entire time wheel is unchanged. As the pointer currentTime continues to advance, the time period that the currentTime wheel can process is also moving backward. The overall time range is between currentTime and currentTime+interval.
2.3 Upgrade and demotion of time wheel
What if there is a task whose timing is 350ms? Will wheelSize be directly expanded? Kafka has timed tasks of tens or even hundreds of thousands of milliseconds. There is no limit to how far wheelSize can be extended. Even if the expiration time of all timed tasks is set to an upper limit, such as 1 million milliseconds, then the wheelSize of 1 million milliseconds not only takes up a lot of memory space, And efficiency will be reduced. For this, Kafka introduced the concept of a hierarchical time wheel, in which tasks are attempted to be added to an upper time wheel when their due time exceeds the time range represented by the current time wheel.
Refer to the above figure and reuse the previous case. TickMs =1ms, wheelSize=20, interval=20ms for the first layer. The tickMs of the layer 2 time wheel is the interval of the layer 1 time wheel, that is, 20ms. The wheelSize of each time wheel is fixed and is 20, so the overall time span interval of the second time wheel is 400ms. Similarly, this 400ms is also the size of tickMs of layer 3, and the total time span of the time wheel of layer 3 is 8000ms.
The 350ms task mentioned above will not be inserted into the layer 1 time wheel, but into the layer 2 time wheel whose interval=20*20. Which bucket will be inserted into the time wheel? With 350 / first tickMs (20) = virtualId (17), 350 / tickMs (20) = virtualId (17), 350 / tickMs (20) = virtualId (17), Then virtualId(17)virtualId(17) %wheelSize (20) = 17virtualId(17), so 350 will be placed in the 17th bucket. If there is a task that will be executed 450ms later, it will be placed in the third time wheel and, according to the formula, at the 0th bucket. The 0th bucket will contain [400,800] ms tasks. As time goes by, when 400ms passes, the task to be executed after 450ms will still have 50ms to be executed. At this time, there will be a time wheel degradation operation, and the 50ms task will be resubmitted to the hierarchical time wheel. Then the 50ms task will be put into the second bucket of the second time round according to the formula, and the time range of this bucket is [40,60] ms. After another 40ms, the 50ms task will be monitored again. At this point, there is 10ms before the task execution. Similarly, if a 10ms task is submitted to the layer time round, it will be added to the 10th bucket of the layer 1 time round. Therefore, after another 10ms, the task will expire and finally be executed.
Is the upgrade and downgrade operation of the entire time wheel very similar to our clock? The first time wheel tickMs=1s, wheelSize=60, interval=1min, which is second; Layer 2 tickMs=1min, wheelSize=60, interval=1hour, which is minutes; Layer 3 tickMs=1hour, wheelSize is 12, interval is 12hours, which is the clock. The clock pointer corresponds to currentTime in the program, which will be discussed later in the analysis of the code (understanding this is also the key and difficulty of understanding the time wheel).
2.4 Task adding and driving time wheel rolling core flow chart
3. Project address
Gitee.com/weylan/timi…