Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

preface

When a thread pool is running, it constantly fetches tasks from the task queue and executes them. If we want to implement delayed or timed task execution, an important point is that the task queue will be sorted according to the different delay time of the task. The shorter the delay time is, the task queue will be ranked in front of the queue and the task will be executed first.

A queue is a first-in, first-out data structure, that is, the data that enters the queue first is retrieved first. However, there is a special queue called priority queue, which sorts the inserted data preferentially and ensures that the data with higher priority is retrieved first, regardless of the insertion order.

A common way to make priority queues efficient is to use the heap. For heap implementation, see Heap and binary Heap Implementation and Features.

ScheduledThreadPoolExecutor thread pool

ScheduledThreadPoolExecutor inherited from ThreadPoolExecutor, so its internal data structure and basic same ThreadPoolExecutor, and based on its increasing the function of scheduling tasks according to time, It can be divided into delayed task execution and periodic task execution.

ScheduledThreadPoolExecutor constructor can only pass three parameters corePoolSize, ThreadFactory, RejectedExecutionHandler, The default maximumPoolSize is integer.max_value.

DelayedWorkQueue is a highly customized DelayedWorkQueue. Its implementation principle is basically the same as DelayQueue. Its core data structure is the priority queue of the binary minimum heap. MaximumPoolSize is no longer needed, so the thread pool will always have at most corePoolSize worker threads running.

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } Duplicate codeCopy the code

DelayedWorkQueue Delay blocking queue

DelayedWorkQueue is also a DelayQueue designed for scheduled tasks. The implementation of DelayedWorkQueue is the same as DelayQueue, but the priority queue and the implementation process of DelayQueue are migrated to its own method body, so that it can flexibly add method calls specific to scheduled tasks in the process.

The working principle of

ScheduledThreadPoolExecutor is to achieve a blocking his work queue, because ScheduleThreadPoolExecutor requires some special work queue.

DelayedWorkQueue is a heap-based data structure similar to DelayQueue and PriorityQueue. When executing scheduled tasks, the execution time of each task is different, so DelayedWorkQueue works in ascending order of execution time, with tasks whose execution time is closer to the current time at the front of the queue (note: The order here is not absolute, sorting in the heap only guarantees that the next execution time of the child node is larger than the next execution time of the parent node, and the leaf nodes are not necessarily sequential).

The heap structure is shown as follows:As you can see, the DelayedWorkQueue is a queue based on the minimal heap structure. The heap structure can be represented by arrays, which can be converted to arrays like this:If the index of the “first element” in the array is 0, then the position of the parent node and the child node is as follows:

  • The index of the child to the left of whose index is (2∗ I +1);
  • The index of the child to the right of whose index is (2∗ I +2);
  • The index of the parent of index is floor((I −1)/2);

Why use DelayedWorkQueue?

  • When a scheduled task is executed, the latest task to be executed must be displayed. Therefore, the task whose execution time is earlier in the queue must be displayed each time. Therefore, the priority queue is used naturally.
  • DelayedWorkQueue is a priority queue that guarantees that each queue is the first in the queue. Because it is a heap-based queue, the worst time complexity of the heap structure is O(logN) when performing insert and delete operations.

Source code analysis

define

The class inheritance of DelayedWorkQueue is as follows:It contains methods defined as follows:

Member attribute

// The initial size of the array. private static final int INITIAL_CAPACITY = 16; Private RunnableScheduledFuture<? Private RunnableScheduledFuture<? >[] queue = new RunnableScheduledFuture<? >[INITIAL_CAPACITY]; // Use lock to ensure multi-threaded concurrency safety. private final ReentrantLock lock = new ReentrantLock(); Private int size = 0; Private Thread leader = null; Private final Condition available = lock.newCondition(); private final Condition available = lock.newCondition(); Copy the codeCopy the code

DelayedWorkQueue is an array to store the elements in the queue. The core data structure is the priority queue of the binary minimum heap, which is automatically expanded when the queue is full.

Note the leader here, which is a variation of the leader-follower pattern to reduce unnecessary timed waits. What does that mean?

For a multi-threaded network model, all threads will have one of three identities: leader and follower, and a working state: Proccesser. Its basic principle is that there is always at most one leader. All followers are waiting to become the leader. When the thread pool is started, a Leader will be automatically generated to wait for the NETWORK I/O event. When an event occurs, the Leader will first notify a Follower thread to promote him to the new Leader and then go to work to handle the network event. After processing the event, the Leader thread will join the Follower thread waiting queue. Wait to become the Leader next time. This approach enhances CPU cache similarity and eliminates dynamic memory allocation and data exchange between threads.

The constructor

DelayedWorkQueue is ScheduledThreadPoolExecutor static class values, by default only a no arguments constructor.

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { // ... } Duplicate codeCopy the code

The method of entrance

DelayedWorkQueue provides three insert element methods: PUT, Add, and Offer (with time). We found that all three add methods call the Offer method compared to a normal blocking queue. That’s because it doesn’t have a queue full condition, which means you can keep adding elements to the DelayedWorkQueue, expanding the array as the number of elements exceeds the length of the array.

public void put(Runnable e) { offer(e); } public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); } Duplicate codeCopy the code

Add elements to offer

ScheduledThreadPoolExecutor submit task is invoked DelayedWorkQueue. The add, and the add, put some foreign provide add elements such as method calls the offer.

public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<? > e = (RunnableScheduledFuture<? >)x; // Final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; If (I >= queue.length) // Grow (); // Add one element to the queue size = I + 1; Queue [0] = e; if (I == 0) {queue[0] = e; setIndex(e, 0); } else {// Call the siftUp method to order the inserted elements. siftUp(i, e); } // the new element is the queue header, the queue header is replaced, and the thread waiting for the task is woken up. if (queue[0] == e) { leader = null; // Wake up the thread that is waiting to acquire the task. } } finally { lock.unlock(); } return true; } Duplicate codeCopy the code

The basic process is as follows:

  1. As the producer’s entry point, it first acquires the lock.
  2. Determine if the queue is about to fill (size >= queue.length), expand the capacity when it is fullgrow().
  3. If the queue is not full, size+1.
  4. Determine if the added element is the first; if so, no heap is required.
  5. If the added element is not the first, it needs to be heapedsiftUp.
  6. If the heap top element happens to be the one added at this point, the take thread is awakened for consumption.
  7. Finally release the lock.

The basic flow chart of offer is as follows:

Expansion turns ()

As you can see, when the queue is full, it does not block and wait, but continues to expand. NewCapacity newCapacity is expanded by 50% from oldCapacity (oldCapacity >> 1 is oldCapacity /2). Array.copyof creates a new empty array based on newCapacity and copies the data from the old array to the new array.

private void grow() { int oldCapacity = queue.length; // Increase the number of arrays by half each time. // grow 50% int newCapacity = oldCapacity + (oldCapacity >> 1); if (newCapacity < 0) // overflow newCapacity = Integer.MAX_VALUE; Array.copyof to copy a new array queue = array.copyof (queue, newCapacity); } Duplicate codeCopy the code

Heap siftUp up

The newly added element is first added to the bottom of the heap, and then compared step by step with the parent node above. If it is smaller than the parent node, it will switch places with the parent node, and the cycle will be compared until it is larger than the parent node. Loop through to find where the element key should be inserted in the heap binary tree, and interact with the parent node.

See heap and binary Heap Implementation and Features for a detailed process for upheaping siftUp.

private void siftUp(int k, RunnableScheduledFuture<? While (k > 0) {// int parent = (k - 1) / 2 int parent = (k - 1) >>> 1; RunnableScheduledFuture<? > e = queue[parent]; // If the key element is larger than the parent element, then the loop is broken because the order is from smallest to largest. if (key.compareTo(e) >= 0) break; Queue [k] = e; queue[k] = e; // This is only useful if the element is an instance of the ScheduledFutureTask object to quickly cancel the task. setIndex(e, k); // reassign k to find the node where the key should be inserted into the heap binary tree k = parent; Queue [k] = key; queue[k] = key; setIndex(key, k); } Duplicate codeCopy the code

If the execution time of the key node is shorter than that of the parent node, the two nodes are switched so that the node with the highest execution time is placed at the front of the queue.

Assume that the delay time (obtained by calling getDelay()) of the newly joined node is 5, and the execution process is as follows:

  1. The new node is added to the end of the array with an index k of 7

  1. Queue [7] = queue[3]; parent = (k-1) >>> 1; parent = 3;

  1. At this point, set k to 3 and continue the loop, again counting parent to 1 and queue[1] to 3, since 5 > 3, then exit the loop and finally k to 3

It can be seen that each new node is judged according to the parent node, but does not affect the sibling node.

A team approach

DelayedWorkQueue provides the following queue exit methods

  • Take (), waiting to fetch the queue header element
  • Poll (), which immediately gets the queue header element
  • Poll (long timeout, TimeUnit unit), timeout waiting for the queue header element

Take consumption element

Worker Worker threads can work queue circulation consumption since the start of elements, because ScheduledThreadPoolExecutor keepAliveTime = 0, so consumer tasks. Its just call DelayedWorkQueue take.

public RunnableScheduledFuture<? > take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<? > first = queue[0]; // If there is no task, let the thread wait in the available condition. if (first == null) available.await(); Else {// Obtain the remaining delay time of the task long delay = first.getdelay (NANOSECONDS); // If the delay time is up, the task is returned for execution. if (delay <= 0) return finishPoll(first); // Set first to null. When the thread waits, no reference to first is held. First = null; // Do not retain the same thread that is still waiting for the queue header task. // Do not retain the queue header task. if (leader ! = null) available.await(); ThisThread = thread.currentThread (); leader = thisThread; Try {// Automatically wake up when the task's delay time is up. available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] ! = null) // Wake up the waiting thread available.signal(); ock.unlock(); }}Copy the code

The basic process of take is as follows:

  • The interruptible lock is first acquired to determine whether the heap top element is empty, and if it is empty, it blocks and waitsavailable.await().
  • If the heap top element is not empty, its execution delay is obtaineddelay.delay <= 0When the execution time is up, the queue exitsfinishPoll.
  • delay > 0It’s not execution time yet. JudgeleaderIf the thread is empty, another take thread is waiting, and the current take will block the wait indefinitely.
  • leaderThe thread is empty and the current take thread is set toleaderAnd blocks the waitdelayThe length.
  • If the current leader thread is woken up automatically by another take thread, the leader thread will finally wake upleaderSet tonull.
  • Loop the judgment againdelay <= 0Out of the queue.
  • When the leader is empty and the heap top element is not empty, the other take threads wake up and lock.

The basic flow chart of take is as follows:

The take thread blocks and waits

You can see that the producer take thread blocks the wait in two ways:

  • The heap top element is empty.
  • Delay > 0 for the heaptop element.

When is the take method called?

In ThreadPoolExecutor, the getTask method, the worker thread loops to fetch tasks from the workQueue. Timed tasks, however, are different because if the task is executed as soon as getTask gets it, it may not be ready for execution, so in the take method, it is necessary to ensure that the task can only be picked up at the specified execution time.

Leader thread

Let’s talk about the role of leader, which is to reduce unnecessary timing wait. The leader thread design, a variation of the leader-follower pattern, is designed to wait for unnecessary time. When a take thread becomes the leader thread, it only needs to wait for the next delay, while other take threads need to wait for the leader thread to exit the queue before waking up other take threads.

For example, if there is no leader, available.awaitnanos (delay) is executed every time a take is performed, assuming that the current thread executes the code, but there is no signal, and the second thread executes the code, and the second thread also blocks. There is no point in executing this code because only one thread will return queue[0] from the take (because of the lock), and the other thread will return the queue[0] from which the for loop was executed, which is no longer the same queue[0], and then block again.

If the leader is not empty, it means that the first node in the queue is already waiting for the queue to exit. In this case, other threads will block all the time, reducing useless blocking. (Note that signal() is called in finally to wake up a thread. Instead of signalAll()).

FinishPoll out queue

Heap top element delay<=0, execution time, queue out is a downward heapdown process siftDown.

Private RunnableScheduledFuture<? > finishPoll(RunnableScheduledFuture<? < span style = "max-width: 100%; clear: both; // Get the last element x RunnableScheduledFuture<? > x = queue[s]; Queue [s] = null; if (s ! = 0) // The queue header element is removed, so the order is reordered. siftDown(0, x); setIndex(f, -1); return f; } Duplicate codeCopy the code

There are three steps to delete the heap:

  1. Subtract one from the number of elements in the queue;
  2. Set the end of the queue element to the header element, and set the end of the queue element to NULL;
  3. Call the setDown(O,x) method to ensure that the elements are sorted by priority.

SiftDown is heaped down

After the top element is out of the queue, the structure of the heap is destroyed. It needs to be organized, and the bottom element is moved to the top of the heap and then heaped downward:

  • Starting at the top of the heap, the father node is compared to the smaller child node of the left and right children (the left child is not necessarily smaller than the right child).
  • If the parent node is less than or equal to the smaller child node, the loop ends and no position is changed.
  • If the parent node is larger than the smaller child node, the position is switched.
  • Continue the loop to judge the relationship between the parent node and the child node until the father node is less than or equal to the child node.

For a detailed process of siftDown heapdown, see heap and binary Heap Implementation and Features.

private void siftDown(int k, RunnableScheduledFuture<? > key) {// unsigned right, equivalent to size/2 int half = size >>> 1; // Through the loop, ensure that the value of the parent node is not greater than that of the child node. While (k < half) {// left child = (k * 2) + 1 int child = (k << 1) + 1; RunnableScheduledFuture<? > c = queue[child]; Int right = child + 1; // If the left child element value is greater than the right child element value, then the right child is the smaller child. If (right < size &&c.com pareTo(queue[right]) > 0) c = queue[child = right]; If (key.compareTo(c) <= 0) break; Queue [k] = c; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); } Duplicate codeCopy the code

The siftDown method can be executed either with no children or with children (as judged by half).

For example, if there are no children:

Suppose the initial heap looks like this:

  1. If k = 3, then k = half and there are no children, siftDown sets the node with index 3 to the last node in the array:

Case with child nodes:

Assuming k = 0, perform the following steps:

  1. Child = 1, right = 2;

  1. C = queue[child]; c = queue[child];
  2. If the interval for key is less than the interval for C, go ahead and set k to c and then set k to child.

  1. Because half = 3, k = 1, continue the loop and the index becomes:

  1. At this time, after the above judgment, the value of k is 3, and the final result is as follows:

  1. Finally, if called in the finishPoll method, the index of the node whose index is 0 is set to -1, indicating that the node has been deleted and the size has been reduced by 1. The final result is as follows:

It can be seen that the siftdown method is not orderly after execution, but it can be found that the next execution time of the child node must be longer than the next execution time of the parent node. Since the node with the smallest next execution time of the left child node and the right child node is selected each time, orderly queue exit can still be guaranteed during take and poll.

poll()

Get the queue header element immediately. If the queue header task is null or the task delay time has not expired, the task cannot be returned. Therefore, null is returned directly. Otherwise, call the finishPoll method, remove the queue header element and return it.

public RunnableScheduledFuture<? > poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<? > first = queue[0]; / / queue task is null, or task is not to delay time, return null if (first = = null | | first. GetDelay (NANOSECONDS) > 0) return null; Return finishPoll(first); else return finishPoll(first); } finally { lock.unlock(); }} Copy the codeCopy the code

poll(long timeout, TimeUnit unit)

The timeout wait to fetch the queue header element, in contrast to the take method, takes into account the set timeout, and returns NULL if no useful task has been fetched after the timeout. The rest is the same logic as in the take method.

public RunnableScheduledFuture<? > poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<? > first = queue[0]; // If there is no task. If (first == null) {// If (nanos <= 0) return null; Nanos = available.awaitNanos(nanos); } else {// Obtain the remaining delay time of the task long delay = first.getdelay (NANOSECONDS); // If the delay time is up, the task is returned for execution. if (delay <= 0) return finishPoll(first); If (nanos <= 0) return null; // Set first to null. When the thread waits, no reference to first is held. First = null; // Don't retain ref while waiting // If the timeout is less than the remaining delay time of the task, then the task may not be retrieved. / / the thread here waiting timeout nanos if (nanos < delay | | leader! = null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; Try {// Automatically wake up when the task's delay time is up. long timeLeft = available.awaitNanos(delay); Nanos -= delay-timeleft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] ! = null) // Wake up the waiting thread available.signal(); lock.unlock(); }} Copy the codeCopy the code

Remove Removes the specified element

Deleting a specified element is usually used to cancel a task while it is still in a blocking queue. When the deleted element is not the last element of the heap, it needs to be heaped.

public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); if (i < 0) return false; HeapIndex setIndex(queue[I], -1); int s = --size; RunnableScheduledFuture<? > replacement = queue[s]; queue[s] = null; if (s ! // siftDown(I, replacement); // siftDown(I, replacement); If (queue[I] == replacement) // If (queue[I] == replacement) // If (queue[I] == replacement) // If (queue[I] == replacement) } return true; } finally { lock.unlock(); }} Copy the codeCopy the code

Suppose the initial heap structure is as follows:K = 1, key = last node:Based on the analysis of siftDown method above, the results of the implementation of siftDown method are as follows:The value of the last node is smaller than that of the parent node, so the siftUp method is executed to ensure that the next execution time of the child node is longer than that of the parent node, so the final result is as follows:

conclusion

The DelayedWorkQueue is used to ensure that the tasks added to the queue are sorted according to the delay time of the tasks. The tasks with less delay time are obtained first.

  1. The data structure of DelayedWorkQueue is heap-based;

  2. The DelayedWorkQueue uses an array to implement the heap. The root node is replaced by the last leaf node, and then pushed down until the heap condition is satisfied. Finally, leaf nodes join the queue, and then push up to meet the conditions of heap establishment;

  3. When the DelayedWorkQueue becomes full, it is automatically expanded by half of its original capacity, that is, it will never block, and can be expanded by up to integer. MAX_VALUE.

  4. DelayedWorkQueue consumes element take and blocks wait when the top element is empty and delay >0.

  5. DelayedWorkQueue is a producer-consumer pattern where production never blocks and consumption can block;

  6. DelayedWorkQueue has a leader thread variable, which is a variant of leader-follower mode. When a take thread becomes the leader thread, it only needs to wait for the next delay, while other take threads need to wait for the leader thread to exit the queue before waking up other take threads.

Reference:

Block queue – DelayedWorkQueue source code analysis