HashedWheelTimer algorithm is widely used, including NetTY, Dubbo and even Linux, to manage and maintain a large number of Timer scheduling algorithms.
A HashedWheelTimer is an annular structure, similar to a clock, which is divided into many slots, one of which represents a time interval. Each slot uses a bidirectional linked list to store scheduled tasks, and the pointer jumps periodically. When it jumps to a slot, the scheduled task of that slot will be executed.
implementation
The specific implementation of the timing wheel can be divided into three main roles according to different responsibilities: clock engine, clock slot and timing task. In order to thoroughly understand its implementation, the text is interspersed, which obliterates the characteristics of the specific implementation language.
Timed task — HashedWheelTimeout
In the concrete implementation, the timed task HashedWheelTimeout plays a dual role, which is not only the node of the bidirectional linked list, but also the container of the actual scheduling task TimerTask, which is loaded into the corresponding clock slot by the engine using & hash at the start of tick operation.
Key attributes.
HashedWheelTimeout Next,prev: Precursors and successors of the current scheduled task in the linked list
TimerTask Task: indicates the scheduled task
Long deadline: Indicates the time relative to the engine startTime, which is obtained by currentTime + delay-startTime. The unit of time is nanosecond
Delay: indicates the delay in executing the task when the task is submitted. CurrentTime: indicates the current kernel timeCopy the code
Int State: indicates the status of the scheduled task. INIT0 is the initial state, CANCELLED1 is cancelled, EXPIRED2 is expired
Note
The marking of a state is done immediately after an expire or Cancel operation is initiated
HashedWheelTimeout itself does not support many operations, as follows:
-
Remove: Call the remove command of the owning clock slot to remove the clock from the slot. If the clock slot is not loaded into the slot, perform the -1 operation on pendingTimeouts of the owning timing wheel
-
Expire: Drives the TimerTask to run
-
Cancel: The task is cancelled by the subserved. the cancelled task is loaded into the cancelledTimeouts queue of the timing wheel, and the engine calls remove to remove itself when it enters the next tick time
The clock slot – HashedWheelBucket
A clock slot is essentially a bidirectional linked list container for caching and managing scheduled tasks. Each node is a scheduled task. It holds the first and last nodes of the list, and since each node holds a precursor and a successor reference, this feature of the list can be used to do the following:
-
AddTimeout: adds a tail node and a task
-
PollTimeout: Remove the first node and retrieve the header task
-
Remove: Removes a node based on the reference. If the task to be removed has been processed or cancelled, process pendingTimeouts of the owning timing wheel with -1
-
ClearTimeouts: Loop pollTimeout to retrieve all tasks that have not timed out or been cancelled
-
ExpireTimeouts: loops through all nodes in the slot from the first node, invokes remove to retrieve expire tasks, or directly removes removed tasks, and performs -1 on the remaining rounds of other normal tasks
Clock engine — HashedWheelTimer
The clock engine operates periodically and always selects the corresponding clock slot according to the tick of the current clock, starts to iterate from the head of the linked list, calculates whether each task belongs to the current clock cycle, takes out the operation if it does, or performs the operation of subtracting the number of remaining clock cycles.
In addition, the engine maintains two blocking queues of timed cache tasks, one for incoming and intermittent deliveries and the other for those that are actively cancelled, and the engine needs to load or remove them from the corresponding clock slot before the tick starts.
Key attributes.
Queue
timeouts, cancelledTimeouts: queues used to cache external actively submitted or cancelled tasks
Int workerState: indicates the current status of the timing wheel.
Note
The status value
Init0 – the initial state
Started1 — already running
Shutdown2 — has finished running
StartTime: indicates the time when the current scheduled wheel starts to schedule tasks. The engine starts to execute all scheduled tasks when the first task is submitted
Ticks: Ticks, maintained by the clock engine, is a monotonically increasing number of steps of 1, i.e. Ticks +=1
TicksDuration: ticksDuration for which a particular clock slot is polled for one tick
PendingTimeouts Number of remaining real-time tasks on the timing wheel
N: The number of slots on the clock wheel is n, which may not be the same as the expected number. Take the value greater than and nearest to 2 to the power, which is calculated as n=2x
Mask: mask, mask = n-1. Run Ticks & mask to locate the clock slot at the corresponding position. The effect is equivalent to Ticks % (mask + 1), which is guaranteed by the power of n (2)
Engine kernel — Worker
The clock engine is actually divided into two parts: external interface and scheduling operation. It can be imagined that the kernel is a pacemaker of an engine driving the operation of the timing wheel, and completing the scheduling of tasks, corresponding to a working thread in the implementation. For easy understanding, the state of the kernel that they depend on is first described separately.
The kernel state
The state machine is a key component of any engine, so the control of state values is critical, so this section is presented as a separate section. Kernel state has a timing wheel maintenance management, external interface to achieve by means of it. The initial state is init, and when the engine is not designed to restart, there is no migration process such as init/started/shutdown → init.
Note
start()
Init – started.
This state is only allowed to occur once in the lifetime of the engine, and is protected defensively with startTime until the entire process is complete
He started – started.
Indicates that the engine is already enabled. Generally, ignore it. Otherwise, it is equivalent to doing nothing
Shutdown – started.
The timing wheel is generally designed to be unable to be resurrected. In this case, this process is not allowed to happen, and belongs to the out-of-bounds behavior of the external caller
Note
stop()
Init to shutdown.
The terminal state occurs when the timer wheel has been initialized, but no task has been submitted to it or the start() operation has been called. However, there is another special case where contention occurs when multiple threads are operating on the same timer wheel, and one thread calls’ stop() ‘just as another thread starts the start() operation
Started to shutdown.
Normal engine shutdown operations, once entered, continue until the engine terminates completely, which in turn terminates the Work thread
Shutdown and shutdown.
This migration process has no actual semantics and is generally skipped
External interface
This is actually covered in the kernel state section
Start: used by the timing wheel to start the engine. However, the outside world does not need to call this method to start the timing wheel, because the timing wheel actively calls this interface every time the outside world calls newTimeout() to submit a task to ensure that the engine is running.
Stop: stops the timing wheel engine and returns scheduled tasks that are not processed
Timeout newTimeout(TimerTask Task, Long delay, TimeUnit Unit) : used to submit tasks to the engine before they are formally added to the timeouts queue. 1) The timing wheel calls Start () to ensure that the engine is started. Then calculate the deadline value for the added Timeout.
Scheduled to run
With the above analysis, it is not difficult to understand the task scheduling of the timing wheel. In simple terms, it is the periodic tick operation, corresponding to the following operations:
-
Wait to enter the ticking cycle
-
The clock turns and the ticking cycle begins:
-
CancelledTimeouts removes the cancelledTimeouts queue one by one
-
Load externally submitted tasks loaded in the Timeouts queue into the corresponding clock slot one by one
-
-
Locate the corresponding clock slot based on the current tick and execute the scheduled task
-
Check whether the engine kernel state has been terminated. If not, repeat the above operations; otherwise, proceed to the next step
-
Add unprocessed tasks to the unprocessedTimeouts queue as follows:
-
Call clearTimeouts() to traverse the clock slot
-
Loop poll() on timeouts queues that are not added to slots
-
-
Remove the last tick cycle and join the cancelledTimeouts queue task
Important
The start times of adjacent tick cycles are theoretically equidistant, but the end times vary depending on the number of tasks to be processed in that cycle. Therefore, the rest of the engine sleep time should be obtained using the following formula:
tickDuration * (tick + 1) - (currentTime - startTime)
Application of timing wheel in Dubbo
In fact, the timing wheel algorithm is not directly used to periodically execute some submission tasks, and the submitted tasks will only be executed once due to expiration. However, in specific applications, the execution of each task will be used to call newTimeout() to submit the current task referenced by the Timer, so that it will continue to execute after several units of time. The advantage of this approach is that if a time-consuming task such as I/O is stuck for a long time, the same task will not be submitted continuously later, resulting in an unmanageable pile of tasks. So the periodic tasks are not strictly fixed tasks that are executed every x units of time.
The application of timing wheel in Dubbo is mainly reflected in the following aspects:
-
Failure to retry
-
Registration of the Register
-
Cancel Unregister
-
Subscribe Subscribe
-
Unsubscribe
-
-
Cycle task
-
The Heartbeat Heartbeat
-
Reconnection Reconnect
-
Offline CloseChannel
-
Important
The timing wheel uses a single thread to manage and trigger tasks. Do not throw exceptions during Task execution. Otherwise, the timing wheel engine crashes and subsequent tasks cannot be executed. The mode of Task is as follows:
try { if (sthCheck()) { logger.warn("Sth happended"); doBuz(); } } catch (Throwable t) { logger.warn("Exception when do sth ", t); } Copy the code
Cycle task
In Dubbo, each connection is represented as a Channel, and the connection between dubbo nodes is established to communicate with each other. A single node needs to maintain the connection with multiple connected nodes: (1) To maintain the connection by continuously sending heartbeat detection; (2) to offline the connection that has been idle for more than a certain period of time; ③ Reconnect the Channel that has been offline but not offline.
The basic steps are as follows:
-
The tick runtime Task retrieves all connected channels for the current node through callbacks
-
Perform actual task operations, such as heartbeat, on nodes that are not shut down
-
The visibility guarantee property of volatile detects whether the current task has been canceled, returns, and continues
-
If the timer wheel is still running, a new Task is submitted using its supplied newTimeout()
The following analysis is based on the source code:
Public abstract class AbstractTimerTask implements TimerTask {/** * public abstract class AbstractTimerTask implements TimerTask { The 'cancel()' action is invoked by the other thread submitting the task */ protected volatile Boolean Cancel =false;
public void cancel() {
this.cancel = true; }... Private void reput(Timeout Timeout, Long tick) {if (timeout == null || tick == null) {
throw new IllegalArgumentException();
}
if (cancel) {
return;
}
Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled()) {
return;
}
timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}
@Override
public void run(Timeout timeout) throws Exception {
Collection<Channel> c = channelProvider.getChannels();
for (Channel channel : c) {
if (channel.isClosed()) {
continue;
}
doTask(channel);
}
reput(timeout, tick);
}
protected abstract void doTask(Channel channel); interface ChannelProvider { Collection<Channel> getChannels(); }}Copy the code
Note
As can be seen from the above code, the mode of using the timing wheel to realize the interval task is relatively fixed, as follows:
Protected volatile Boolean Cancel = false; Public void run(Timeout Timeout) throws Exception {// Perform the task service logic doTask() // Reput (Timeout, tick); } private void reput(Timeout timeout, Long tick) { if (cancel) { return; } // Check that the Timer wheel is running. Timer Timer = timeout.timer(); if (timer.isStop() || timeout.isCancelled()) { return; } // submit a new __Task__ to the timer wheel and run timer.newtimeout (timeout.task(), tick, timeunit.milliseconds) after specifying the tick time; }Copy the code
What you do for each Channel in a periodic task is simple, essentially calling the specified operation of a Channel if the condition is met
- The heart –
channel.send(req)
Request req = new Request(); req.setVersion(Version.getProtocolVersion()); // Req.settwoway (true); // Event type: heartbeat req.setevent (request.heartbeat_event);Copy the code
-
Reconnection — ((Client) channel). Reconnect ()
-
Offline – channel. The close ()
Failure to retry
The complexity and variability of network situations makes it possible to retry multiple times in distributed applications to ensure that a certain type of operation can take place. The implementation of periodic tasks is almost the same as described above, except that retries are performed after an exception is caught and the number of retries is specified. The mode is as follows:
/** * times of retry. * retry task is execute in single thread so that the times is not need volatile. */ Private int times = 1; private int times = 1; . protected void reput(Timeout timeout, long tick) { if (timeout == null) { throw new IllegalArgumentException(); } Timer timer = timeout.timer(); if (timer.isStop() || timeout.isCancelled() || isCancel()) { return; } times++; timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS); } @Override public void run(Timeout timeout) throws Exception { if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { // other thread cancel this timeout or stop the timer. return; } if (times > retryTimes) { // reach the most times of retry. logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times."); return; } if (logger.isInfoEnabled()) { logger.info(taskName + " : " + url); } try { doRetry(url, registry, timeout); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t); // reput this task when catch exception. reput(timeout, retryPeriod); } } protected abstract void doRetry(URL url, FailbackRegistry registry, Timeout timeout);Copy the code