First, time round introduction
The delay queue service set up by the company before is useful to time wheel, but we have never understood its implementation principle.
Recently, there is a project to connect with Alipay. The alipay interface has flow control, and only N interface calls are allowed within a certain period of time. For some businesses, we need to call the Alipay open platform interface frequently.
In order to avoid this problem, we load tasks into the time wheel according to certain delay rules, and implement asynchronous interface call through the scheduling of the time wheel.
Many open source frameworks implement the time wheel algorithm. Here, take Netty as an example to see how it is implemented in Netty.
1.1 Quick Start
Here is an example of API usage.
public class WheelTimerSamples {
private static final HashedWheelTimerInstance INSTANCE = HashedWheelTimerInstance.INSTANCE;
public static void main(String[] args) throws IOException {
INSTANCE.getWheelTimer().newTimeout(new PrintTimerTask(), 3, TimeUnit.SECONDS);
System.in.read();
}
static class PrintTimerTask implements TimerTask {
@Override
public void run(Timeout timeout) {
System.out.println("Hello world"); }}enum HashedWheelTimerInstance {
INSTANCE;
private final HashedWheelTimer wheelTimer;
HashedWheelTimerInstance() {
wheelTimer = new HashedWheelTimer(r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((t1, e) -> System.out.println(t1.getName() + e.getMessage()));
t.setName("-HashedTimerWheelInstance-");
return t;
}, 100, TimeUnit.MILLISECONDS, 64);
}
public HashedWheelTimer getWheelTimer(a) {
returnwheelTimer; }}}Copy the code
In the above example, we customized a HashedWheelTimer and then a customized TimerTask. Loading a task into the time wheel and executing the task after 3s is very simple.
You are advised to define a time wheel as multiple singletons based on service types.
PS: Since the time wheel is executed asynchronously, the JVM cannot exit before the task is executed, so system.in.read (); This line of code cannot be deleted.
1.2 Schematic Diagram
Two, principle analysis
2.1 Time wheel status
The time wheel has the following three states:
- WORKER_STATE_INIT: indicates the initialization state in which the worker thread in the time wheel is not started
- WORKER_STATE_STARTED: Indicates the running state. The worker thread in the time wheel has been started
- WORKER_STATE_SHUTDOWN: Indicates that the time wheel stops working
The state transitions are described below:
2.2 Constructors
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Initializes an array of time wheels whose size is greater than or equal to the first power of 2 of ticksPerWheel, similar to a HashMap
wheel = createWheel(ticksPerWheel);
// Use to locate slots in the array
mask = wheel.length - 1;
// To ensure accuracy, the unit of time inside the time wheel is nanosecond
long duration = unit.toNanos(tickDuration);
// The clock in the time wheel should not be too high or too low
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
// Create a worker thread
workerThread = threadFactory.newThread(worker);
// Detect memory leaks if the thread is not daemon and leakDetection is trueleak = leakDetection || ! workerThread.isDaemon() ? leakDetector.track(this) : null;
// Initialize the maximum number of waiting tasks
this.maxPendingTimeouts = maxPendingTimeouts;
// If the created time cycle instance is greater than 64, the log is printed, and the log is printed only once
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false.true)) { reportTooManyInstances(); }}Copy the code
The parameters in the constructor are quite important, and when customizing the time wheel, we should set reasonable parameters for the scope of the business:
- ThreadFactory: A factory to create a time-wheel task thread that allows us to customize some attributes (thread name, exception handling, etc.) for our thread
- TickDuration: How often the clock is ticked. The smaller the value, the higher the accuracy of the time wheel
- The unit:
tickDuration
The unit of - TicksPerWheel: Time wheel array size
- LeakDetection: Indicates whether to detect a memory leak
- MaxPendingTimeouts: indicates the maximum number of pending tasks in a time round
The clock rotation duration of the time wheel should be set appropriately based on services. If the value is too large, the task triggering time may be inaccurate. If the value is too small, the time wheel rotates frequently and the task cannot be loaded when there are few tasks. The time wheel is in idle state and occupies CPU thread resources.
To prevent the time wheel from occupying too many CPU resources, the system displays a log message when the created time wheel object is larger than 64.
The wheel thread is initialized, not started, when the first task is added to the wheel.
2.3 Adding tasks to a time wheel
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
// Number of waiting tasks +1
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// If the number of waiting tasks in the time wheel is greater than the maximum, the task will be discarded
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// Start the thread in the time wheel
start();
// Calculate the execution time of the current added task
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// Queue the task
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
Copy the code
Tasks will be saved in the queue first. When the clock of the time wheel is switched, it will judge whether to load the tasks in the queue into the time wheel.
public void start(a) {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// There is concurrency, and the CAS operation ensures that only one thread can finally start the worker thread of the time wheel
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
while (startTime == 0) {
try {
// startTimeInitialized is a CountDownLatch to ensure that the startTime attribute of the worker thread is initialized
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.}}}Copy the code
In this case, CAS locks are used to ensure thread safety and avoid multiple startup.
After the worker thread is started, the start() method blocks and is not woken up until the startTime property of the worker thread is initialized. Why do I have to wait for startTime to initialize before I can continue? Since the newTimeout method above needs to calculate the execution time of the currently added task when the thread is started, this execution time is calculated according to startTime.
2.4 Time scheduling
@Override
public void run(a) {
// Initialize startTime.
startTime = System.nanoTime();
if (startTime == 0) {
startTime = 1;
}
// wake up the blocked HashedWheelTimer#start() method to initialize startTime
startTimeInitialized.countDown();
do {
// The clock ticks
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
// Process expired tasks
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
// Load the task into the time wheel
transferTimeoutsToBuckets();
// Execute the task in the current time slotbucket.expireTimeouts(deadline); tick++; }}while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// The time wheel closes, and the unexecuted tasks are saved as a list to the unprocessedTimeouts collection, which is returned in the stop method
// Tasks that have not yet been executed may be in two places: the time round array and the queue
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// Process expired tasks
processCancelledTasks();
}
Copy the code
Each time the time wheel is moved, the tick will be +1. According to this value and (the length of the time wheel array is -1), the slot in the time wheel array can be located by & calculation. Because the tick value keeps increasing, the time wheel array looks like an endless circle.
- To initialize the
startTime
Value because the time of the subsequent task execution is based onstartTime
Calculation of the - The clock ticks, if the time is not up, then
sleep
For a while - Process expired tasks
- Load the task into the time wheel
- Run the tasks in the time wheel corresponding to the current clock
- The time wheel closes, encapsulating all unexecuted tasks to
unprocessedTimeouts
In the set, in thestop
Method to return out - Process expired tasks
Here is a brief outline of the steps to execute the run method. Here is an analysis of the specific methods.
2.5 The clock ticks
If the tickDuration set to 100ms is tickDuration once, the clock should be tickDuration for the next tick. If the tickDuration is not reached, sleep for a while and wake up after the tick.
private long waitForNextTick(a) {
// Calculate the relative time of the next tick of the clock
long deadline = tickDuration * (tick + 1);
for (;;) {
// Get the relative time of the current time
final long currentTime = System.nanoTime() - startTime;
// Calculates the time of the next tick of the distance clock
// The reason why 999999 is added here and then divided by 10000000 is to ensure enough sleep time
// For example, when deadline-currentTime = 2000002, if 999999 is not added, only 2ms is slept
// In fact, 2ms has not reached the deadline, so in order to sleep enough time for the above situation, after adding 999999, I will sleep 1ms more
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// <=0 indicates that you can dial the clock
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
returncurrentTime; }}// This is for compatibility with Windows platforms, because the minimum scheduling unit of Windows platforms is 10ms. If it is not a multiple of 10ms, it may cause inaccurate sleep time
// See https://github.com/Netty/Netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
// Sleep until the next clock strikes
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
returnLong.MIN_VALUE; }}}}Copy the code
In order to keep the task clock accurate, you can see from the above code that Netty has made some optimizations, such as sleepTimeMs calculation, Windows platform processing, etc.
2.6 Loading a task from a queue into a time wheel
private void transferTimeoutsToBuckets(a) {
// A maximum of 100000 tasks in the queue can be processed at a time
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
// Filter cancelled tasks
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
// Count the number of ticks needed to execute the current task
// If the array size of the time wheel is 10 and the calculated value is 12, the task will take one rotation of the time wheel and two ticks of the clock to execute, so we need to calculate the number of turns
long calculated = timeout.deadline / tickDuration;
// Count the number of ticks needed to execute the current task
timeout.remainingRounds = (calculated - tick) / wheel.length;
// Some tasks may be in the queue for a long time and have not been scheduled due to time expiration
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
// Calculate the slot of the task in the time wheel array
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// Multiple tasks may locate the same slot in the time wheel by placing tasks in an array of time wheels. These tasks are linked in a linked listbucket.addTimeout(timeout); }}void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
// Tasks form a bidirectional linked list
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else{ tail.next = timeout; timeout.prev = tail; tail = timeout; }}Copy the code
As mentioned above, tasks are not immediately added to the time wheel, but are temporarily stored in a queue. When the time wheel clock ticks, tasks are loaded from the queue into the time wheel.
The time wheel can process a maximum of 100000 tasks each time. Since the execution time of a task is user-defined, it is necessary to calculate the number of clock ticks required to execute the task and the number of turns of the time wheel. Then load the task into the slot corresponding to the time wheel. Multiple tasks may locate to the same slot after hash calculation, and these tasks will be saved in the structure of bidirectional linked list, which is similar to the case of collision processing with HashMap.
2.7 Executing Tasks
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
while(timeout ! =null) {
HashedWheelTimeout next = timeout.next;
// The loop number of task execution > 0 indicates that the task still needs to go through the remainingRounds circle clock cycle to execute
if (timeout.remainingRounds <= 0) {
// Remove the current task from the list and return to the next task in the list
next = remove(timeout);
if (timeout.deadline <= deadline) {
// Execute the task
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); }}else if (timeout.isCancelled()) {
// Filter cancelled tasks
next = remove(timeout);
} else {
/ / laps - 1timeout.remainingRounds --; } timeout = next; }}public void expire(a) {
// Check the task status
if(! compareAndSetState(ST_INIT, ST_EXPIRED)) {return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '. ', t); }}}Copy the code
The tasks in the time wheel slot are stored in a linked list. The execution time of these tasks may be different. Some are executed on the current clock, and some are executed on the corresponding clock in the next or two cycles. When a task is executed on the current clock, it needs to be removed from the linked list and the linked list relationship needs to be maintained again.
2.8 Termination time round
@Override
public Set<Timeout> stop(a) {
// The thread that terminates the time wheel cannot be the worker thread of the time wheel
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
// Change the state of the time wheel to WORKER_STATE_SHUTDOWN, in two cases
// One: The time wheel is in the WORKER_STATE_INIT state, indicating that there are no tasks in the time wheel from creation to termination
// 2: The time wheel is in the WORKER_STATE_STARTED state. Multiple threads attempt to terminate the time wheel, but only one succeeds
if(! WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// At this point in the code, the time wheel can only be one of two states, WORKER_STATE_INIT and WORKER_STATE_SHUTDOWN
// WORKER_STATE_INIT indicates that there are no tasks in the time wheel, so you do not return unprocessed tasks, but you need to instantiate the time wheel -1
// WORKER_STATE_SHUTDOWN indicates that the CAS operation failed and nothing needs to be done because the thread with successful CAS will process it
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) ! = WORKER_STATE_SHUTDOWN) {// Time wheel instance object -1
INSTANCE_COUNTER.decrementAndGet();
if(leak ! =null) {
boolean closed = leak.close(this);
assertclosed; }}// The CAS operation failed, or the time wheel did not process the task, returns an empty task list
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
// Interrupts the time wheel worker thread
workerThread.interrupt();
try {
// Terminates the time wheel thread to wait for the time wheel worker thread for 100ms. This process is mainly for the time wheel worker thread to process unexecuted tasks
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true; }}if(interrupted) { Thread.currentThread().interrupt(); }}finally {
INSTANCE_COUNTER.decrementAndGet();
if(leak ! =null) {
boolean closed = leak.close(this);
assertclosed; }}// Returns an unprocessed task
return worker.unprocessedTimeouts();
}
Copy the code
When the time wheel is terminated, there are two states of the time wheel:
- WORKER_STATE_INIT: Time wheel initialization. As mentioned earlier, the time wheel worker thread is not started immediately when the time wheel object is initialized, but only when the task is added for the first time
WORKER_STATE_INIT
Indicates that the time wheel has not processed a task WORKER_STATE_STARTED
If multiple threads attempt to terminate the time wheel, only one of them will succeed
When the time wheel stops running, it will return the unexecuted tasks. How to handle these tasks is defined by the business side. This process is similar to the shutdownNow method of the thread pool.
If the time wheel is not running, the time wheel array and the queue of unexecuted and uncancelled tasks are saved to the unprocessedTimeouts collection. Workerthread. join(100); workerThread.join(100); The implementation.
Canceling the tasks in the time wheel is relatively easy, so I won’t outline them here, and you can see what you want to know.
That’s how the time wheel works.
Third, summary
Here is a summary in the form of questions and answers, you can also see these questions, can you answer them well?
3.1 Is the time wheel started after initialization?
No, the state of the initialization completion time wheel is WORKER_STATE_INIT. At this point, the worker thread in the time wheel is not running. The time wheel worker thread is started only when a task is added to the time wheel for the first time. StartTime will be initialized after the time wheel thread is started. The execution time of the task will be calculated according to this field, and the concept of time in the time wheel is relative.
3.2 What if there are still tasks in the time wheel and the service is restarted?
All the tasks in the time wheel are in memory, and the service restart data must be lost. Therefore, when the service is restarted, the business side needs to perform compatibility processing by itself.
3.3 How to customize appropriate time wheel parameters?
There are two important parameters to note when customizing the time wheel:
- TickDuration: clock ticking frequency, assuming a task is executed 10 seconds later,
tickDuration
If you set it to 3 minutes, it’s not going to work,tickDuration
The smaller the value is, the more precise the task will be triggered, but when there is no task, the worker thread will keep spinning to try to take the task from the queue, which consumes CPU resources - TicksPerWheel: Time round the array size, it is assumed that when the time round the clock dial, there are 10000 task processing, but we define time round the size of the array of 8, when one time wheel groove with an average of 1250, 1250 if the tasks are carried out in the current clock, task execution are synchronous, because each task execution time consuming, The triggering time of subsequent tasks may be inaccurate. Conversely, if the array length is set too large, many slots in the time wheel array will be empty when there are few tasks
So when using a custom time wheel, be sure to evaluate your business before setting parameters.
3.4 What are the drawbacks of Netty’s time wheel?
In Netty, the time wheel is implemented by a single thread. If a task is blocked, subsequent task execution will be affected. In addition, Netty’s time wheel is not suitable for creating tasks with long latency, such as throwing hundreds of tasks into the time wheel and setting them to be executed in 10 days. This can result in long linked lists with large round values, and the tasks will consume memory until they are executed.
3.5 Should the time wheel be set as a singleton?
It is strongly recommended to differentiate by business module, with each module creating a singleton time wheel object. As we saw in the above code, the time wheel object is prompted as a log when it is greater than 64. If the time wheel is non-singleton, then the time wheel algorithm is completely useless.
3.6 What is the difference between a time wheel and ScheduledExecutorService?
Tasks in ScheduledExecutorService maintain a heap. When a large number of tasks are performed, the heap structure needs to be adjusted, resulting in performance deterioration. The time wheel is scheduled by the clock and is not limited by the number of tasks.
When the number of tasks is small, the time wheel keeps spinning idle to dial the clock. Compared with ScheduledExecutorService, the time wheel occupies certain CPU resources.
reference
Netty source code interpretation of the time wheel algorithm implementation -HashedWheelTimer
HashedWheelTimer was created using and source code analysis
Several ways to realize timer