preface

We often encounter the need for delayed tasks, timed tasks. In network connection scenarios, it is common to have some kind of timeout control. As the number of connections increases, the number of these timeout tasks tends to be large. Implementing time-out management for a large number of tasks is not an easy task.

Implementation of several scheduled tasks

java.util.Timer

The JDK introduced the Timer data structure in 1.3 to implement timed tasks. The implementation idea of Timer is relatively simple, and it has two main internal attributes:

  • TaskQueue: A list of abstract timeTasks for scheduled tasks.

  • TimerThread: the thread used to execute scheduled tasks.

    private final TaskQueue queue = new TaskQueue();  
    private final TimerThread thread = new TimerThread(queue);
Copy the code

The Timer structure also defines an abstract class, TimerTask, that inherits the Runnable interface. The business system implements the run method of this abstract class to provide concrete delayed task logic.

TaskQueue uses a big top heap internally to sort tasks by their trigger time. TimerThread, on the other hand, retrieves the queue header from the TaskQueue in an infinite loop, waits until the task’s timeout expires and then triggers the task and removes the task from the queue.

The Timer’s data structure and algorithm are easy to understand. All timeout tasks enter the delay queue first. Background timeout threads continually fetch tasks from the delay queue and wait until the timeout period expires to execute the task. Delay queue adopts big top heap sort. In the scenario of delayed tasks, there are three operations: adding tasks, extracting queue header tasks, and viewing queue header tasks.

The event complexity of the view queue header task is O(1). The time complexity of adding tasks and extracting queue headers is O(Logn). When the number of tasks is large, the overhead of adding and deleting is also high. In addition, because there is only one processing thread inside the Timer, if the processing of a delayed task consumes a lot of time, the processing of subsequent tasks will be delayed accordingly.

The code is as follows:

public static void main(String[] args) { Timer timer = new Timer(); Timer.schedule (new java.util.timerTask () {@override public void run() {system.out.println (" Delay 1 second "+ system.currentTimemillis ()); }}, 1000); Timer.schedule (new java.util.timerTask () {@override public void run() {system.out.println (" Delay 2 seconds "+ system.currentTimemillis ()); }}, 2000); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } timer.cancel(); }Copy the code

ScheduledThreadPoolExecutor

Since the Timer has only one thread to handle delayed tasks, it is obviously not enough when there are many tasks. With the introduction of the thread pool interface in JDK1.5, a ScheduledExecutorService subclass interface is provided to handle delayed tasks. Internally, the interface also uses a deferred queue to hold tasks that are sorted using the small top heap. Threads in the thread pool will wait on this queue until there are tasks to extract.

On the whole, ScheduledExecutorService differs from Timer in that ScheduledExecutorService relies on the thread pool to execute tasks, and the tasks themselves will determine the type of tasks. The tasks that need to be repeated will be added to the task queue again after the task is executed.

As for the latter, it only relies on one thread to continuously get the task at the head of the queue and try to execute it, which is not as efficient or safe as the former.

The implementation of ScheduledExecutorService is special in that only one thread can pick up the delay queue header and wait based on the timeout of the task. During this wait, no other thread can acquire the task. The purpose of this implementation is to prevent multiple threads from obtaining tasks at the same time, causing tasks to be triggered before the timeout period is reached or new tasks to be added while waiting for the timeout period.

Because ScheduledExecutorService can use multiple threads, it also reduces the possibility of subsequent tasks being blocked because individual tasks take too long to execute. However, the delay queue also adopts the small top heap sorting mode, so the time complexity of adding and deleting tasks is O(Logn). In the case of a large number of tasks, the performance is poor.

The code is as follows:

On behalf of the public class ScheduledThreadPoolServiceTest {/ / parameter can be implementation of periodic tasks at the same time the number of private ScheduledExecutorService service = Executors.newScheduledThreadPool(3); */ public void task0() {service.schedule(() -> {system.out.println ("task0-start"); sleep(2); System.out.println("task0-end"); }, 2, TimeUnit.SECONDS); } /** * scheduleAtFixedRate: After 2 seconds, the task will be executed every 4 seconds * Note that if the execution time of the task (say 6 seconds) is longer than the interval, Will be waiting for the end of the mission directly to the next task * / public void task1 () {service. ScheduleAtFixedRate (() - > {System. Out. Println (" task1 - start "); sleep(2); System.out.println("task1-end"); }, 2, 4, TimeUnit.SECONDS); } /** * scheduleWithFixedDelay: 2 seconds later, each delay of 4 seconds to execute a task * Note that this is waiting for the completion of the last task execution, Delay after fixed time again to start the next task * / public void task2 () {service. ScheduleWithFixedDelay (() - > {System. Out. Println (" task2 - start "); sleep(2); System.out.println("task2-end"); }, 2, 4, TimeUnit.SECONDS); } private void sleep(long time) { try { TimeUnit.SECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ScheduledThreadPoolServiceTest test = new ScheduledThreadPoolServiceTest();  System.out.println("main start"); test.task0(); //test.task1(); // test.task2(); test.sleep(10); System.out.println("main end"); }}Copy the code

DelayQueue

Java also has a DelayQueue, DelayQueue, and each element that joins the DelayQueue must implement the Delayed interface. The internal delay queue is implemented using PriorityQueue, so use PriorityQueue! The Delayed interface inherits Comparable and therefore the priority queue is ordered by delay.


Redis sorted set

Redis data structure Zset can also achieve the effect of delayed queue, mainly using its score attribute, Redis uses score to sort the members of the set from small to large. Zset is internally implemented with a hop table.

A schematic diagram of the data structure of a skip table:

Overall, the time complexity of skip list deletion operations is O (logN).

Are there more efficient data structures?

To sum up, Timer, ScheduledThreadPool and DelayQueue obtain the earliest task to be executed through the priority queue, so the time complexity of inserting and deleting tasks is O(logn). In addition, periodic tasks of Timer and ScheduledThreadPool are completed by resetting the next execution time of the task.

However, because the time complexity of new tasks and extraction tasks is O(Logn), when the number of tasks is large, such as tens of thousands, hundreds of thousands, the performance cost becomes huge.

The problem is the time complexity, the insert and delete time complexity is O(logn), so assuming that the frequent insert and delete times are m, the total time complexity is O(mlogn).

Are there data structures that are less complex than O(Log2n) for new and extract tasks? There is an answer. In the paper Hashed and Hierarchical Timing Wheels, a data structure named Timing Wheels is designed. When dealing with delayed tasks, the time complexity of new and deleted tasks is reduced to O(1).

Time wheel algorithm

The basic principle of

As the name suggests, the data structure of the time wheel is very similar to the data Pointers on our clocks.

The time wheel is implemented with a circular array, each element of which can be called a slot, as in a HashMap.

Inside the slot, bidirectional linked lists are used to store tasks to be executed. The time complexity of adding and deleting linked lists is O(1). The slot itself also refers to the time precision.

In other words, tasks with a delay of 1.2 seconds and tasks with a delay of 1.5 seconds are added to the same slot, and tasks are executed by traversing the list in the slot at 1 second.

Task to insert

When a delay task needs to be inserted into the time wheel, the residual value of the delay time and unit time is calculated first, and the number of residual value is moved from the current slot pointed by the pointer, which is the slot that the delay task needs to be inserted into.

For example, the time wheel has eight slots, numbered from 0 to 7. The pointer currently points to slot 2. Add a delay task with a delay time of 4 seconds, 4% 8 = 4, so the task will be inserted into the delay task queue in slot 6 (4 + 2 = 6).

Time slot implementation

The slot implementation of the time wheel can be achieved by looping through an array, with the pointer returning to the starting subscript after crossing the boundary of the array. In summary, the algorithm of time wheel can be described as follows:

Queue is used to store delayed tasks. The tasks in the same queue have the same delay time. Store elements in a loop array, with each element pointing to a deferred task queue.

There is a current pointer to a slot in the array that moves to the next slot every unit of time. Delay queue of the slot to which the pointer points, in which all delay tasks are triggered.

Add a delay task in the time wheel, divide the delay time by the residual value per unit time, and move the slot corresponding to the residual value from the current pointer, which is the slot in which the delay task is placed.

Based on this data structure, the time complexity of inserting a delayed task is reduced to O(1). When the pointer points to a slot, all the deferred tasks in the queue connected to the slot are triggered.

The firing and execution of a delayed task should not affect the timing accuracy of the backward movement of the pointer. Therefore, in general, the thread used to move the pointer is only responsible for triggering the task, leaving the execution of the task to other threads. For example, a queue of deferred tasks in a slot can be put into an additional thread pool for execution, and then a new queue of deferred tasks can be created in the slot for subsequent tasks to be added.

About the capacity

What if you were to add a task that will be executed in 50 seconds? It’s not enough, is it? Do you want to add a slot? Expand like HashMap?

Assume that the accuracy is 1 second and the time wheel slot number is 86400 (60 x 60 x 24) to support the delay task with a delay time of 1 day. This consumes more memory. Obviously, simply increasing the number of slots is not a good solution.

There are two common ways:

By increasing the number of rounds. 50% 8 + 1 = 3, that is, should be placed in slot 3, subscript 2 position. And then 50 minus 1 over 8 is 6, so the number of rounds is 6. That is, after 6 rounds of scanning the slot at 2 will trigger the task. HashedWheelTimer in Netty uses this approach.

Through multiple levels. This is more like our watch, like our second hand goes around, the minute hand goes around, the hour hand goes around, the hour hand goes around.

This is how the multi-level time wheel is implemented. If the figure above is the first layer, then the first layer goes around and the second layer goes around.

If there are eight slots in the second layer, then one circle in the second layer and one grid in the third layer is 64 seconds.

Then a grid of three layers, eight slots in each layer, a total of 24 slots time wheel can process tasks with a maximum delay of 512 seconds.

If a task is executed with a delay of 500 seconds, then the initial addition must be placed in the third layer. When the time passes 436 seconds, it will take 64 seconds to trigger the execution of the task. At this point, it is a task with a delay of 64 seconds. So it will be lowered into the second layer, the first layer will not be able to put it.

After another 56 seconds, it will be an 8-second delay, so it will be relegated to tier 1 again, waiting to be executed.

Demotion is to ensure consistency of time accuracy. Kafka uses a multi-layered time wheel algorithm internally.

Degradation process:

The next article will cover the implementation of the time wheel in Netty and Kafka.