preface

Before introducing PriorityBlockingQueue and DelayQueue, we need to understand a data structure: binary heap. Because PriorityBlockingQueue uses the least binary heap algorithm internally to ensure that every popup is the smallest element, DelayQueue relies on PriorityBlockingQueue.

Binary heap

The data structure of the heap is a complete binary tree. A complete binary tree means that all layers except the last layer have left and right child nodes. The binary heap can be divided into the maximum binary heap and the minimum binary heap.

The maximum heap has a node that is at most as large as its parent, and the minimum heap has a node that is at most as small as its parent. So the largest node in the largest heap is always the root, and the smallest node in the smallest heap is always the root.

Using arrays (subscripts starting with 1) to store binary heaps has the following properties:

  • The NTH child node is at index[2n] and index[2n+1]. For example, the child nodes at index[1] are at index[2] and index[3], and the child nodes at index[2] are at Index [4] and [5], and so on.
  • The subscripts of leaf nodes are index[n/2+1] to index[n]. For example, in an array of length 9, the elements from index[5] to index[9] belong to leaf nodes.
  • The parent of the NTH position (non-root node) is: n/2

Here is a binary heap (the circle numbers indicate array subscripts, not the values of real elements) :For example, at position 4, its left and right child nodes are 24 and 2So 4 plus 1 is 8 and 9, and n over 2 plus 1 is 5, which means we have leaves from 5 to n, which is 9.

There are three basic operations for a heap: initialization, sinking, and floating (using the minimum binary heap as an example below) :

  • Initialization: Initializes a heap from an unordered array. Starting with the last non-leaf node, compare the parent node to the child node. If the parent node is larger than the child node, replace the parent node with the child node, ensuring that the parent node <= the child node
  • Sink: Used to insert elements. A new element is inserted at the end of the array, compared to the parent node, if larger, the insertion is complete, if smaller, the swap, and so on.
  • Float down: Used to remove elements. If you remove a head node element, the end of the array is taken and placed on the head node, then compared to the child node, and if larger than the child node, the position is switched, and so on.

Note: Unlike binary trees, binary heaps do not guarantee the size of the left and right nodes

PriorityBlockingQueue

PriorityBlockingQueue is an unbounded blocking queue (size limited by memory) that supports a priority. The main difference between unbounded queues and the previous three types of bounded queues is that even if the length is specified, the queue will automatically expand when the queue reaches the maximum number of elements, so the PriorityBlockingQueue will not block when adding elements. If the size of the queue exceeds the memory limit, the PriorityBlockingQueue will not block. OutOfMemoryError is thrown.

By default, the PriorityBlockingQueue queue elements are sorted in a natural ascending order. You can also customize the class to implement the compareTo() method to specify element collation, or at initialization, you can specify the construction parameter Comparator to sort the elements. Note: PriorityBlockingQueue does not guarantee the order of elements of the same priority (that is, if two values are sorted idenally, the order is not guaranteed).

PriorityBlockingQueue class: PriorityBlockingQueueYou can see that four constructors are provided:

  • PriorityBlockingQueue() : Initializes a queue of default size (11) length and uses the default natural sort.
  • PriorityBlockingQueue(int) : Initializes a queue of specified length, using the default natural sort.
  • PriorityBlockingQueue(int,Comparator) : Initializes a queue of the specified size and sorts by the specified Comparator.
  • PriorityBlockingQueue(Collection) : Initializes and heaps the incoming Collection. If the current Collection is of type SortedSet or PriorityBlockingQueue, keep the original order. Otherwise, heaps using natural sort.

Initialize the

The first two constructors eventually call the third constructor to initialize a queue:We see that there is only one Condition queue, which is used to block the unqueued thread. The enqueued thread will not be blocked. Let’s focus on the fourth constructor, which initializes a queue:

public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; //true indicates that heapification is required, i.e. reordering is required
        boolean screen = true;  //true: null values need to be filtered
        if (c instanceofSortedSet<? >) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;// If the comparator is of type SortedSet, no heap is required
        }
        else if (c instanceofPriorityBlockingQueue<? >) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;If the comparator is of type PriorityBlockingQueue, no null values need to be filtered
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;// If pQ is a PriorityBlockingQueue, there is no need to heap
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if(a.getClass() ! = Object[].class) a = Arrays.copyOf(a, n, Object[].class);// If c.torray() fails, duplicate an array
        if (screen && (n == 1 || this.comparator ! =null)) {/ /??
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();// heap (sort)
    }
Copy the code

Is it marked? If judgment, did not expect what scene will happen, if you know, please leave a message to inform, thank you very much!

This looks like a long piece of code, but it actually assigns the specified collection to the queue, verifies the collation, and calls the heapify() method if a sort is needed. This initialization sort is key:Here is the binary heap representation of an array [8,5,2,7,6,4,1,9,3]

First see code 437 lines, from the characteristics of the binary heap know, binary heap initialization can be started from the last leaf node, is n / 2, but because of this algorithm is based on the elements from 1 to calculate, and array from zero beginning, so it need to minus 1, from below the position of the three elements (7) began to cycle ahead.

Then we have two sort judgments, the logic is the same, and we go to the siftDownComparable method, which basically does the sinking of the elementThe main logic is:

  1. Compare the left and right children of the current loop node, making sure to get the subscript child of the smallest node
  2. The element corresponding to the child is then compared to the parent node, ensuring that the parent node is < the smallest node
  3. Finally, the current element is resized to the smallest (possibly left or right) child (if any) of the node, and so on, completing the element sinking.

First sink

First cycle the elements from the 7, compare elements 9 and 3, found 9 > 3, temporary variable substitution, and then compare element 7 and 3 element, found 7 > 3, so the parent node and right child nodes directly replace, completed the first cycle (because the child node is a leaf node does not meet the secondary loop condition).

Second sinking

The second loop reaches the position of subscript 2, that is, element 2, similar to the first loop. Because the child node is a leaf node, it also completes the loop once and directly completes the replacement of the parent node and the youngest node. The upgrade process is as follows:

Third sink

Third cycle is to position the subscript 1, five elements, this time because the left child node is less than the right child nodes, so don’t need a temporary replacement, to directly compare the left child node and parent-child, note here in figure 2 is a temporary process, because was that the left child node assignment to the parent node, and then found there child nodes under the left child node will conduct a cycle, 5 is not assigned to the left child until it breaks out of the loop, completing the substitution:

Fourth sink

The fourth loop gets to the 0 subscript, element 8, and completes the 1 and 3 substitution first, and then completes the 3 and 8 substitution:At this time because the boy node 2 subscript < half, so again cycle (note when again cycle or take the first element of 8 to compare and left and right child nodes), and then will be 8 and 2 to replace, the position of the element to assign values to the subscript 2, then this time does not satisfy the loop condition, end of cycle, This is when element 8 is formally assigned to subscript 6:As you can see in the two flows above, element 8 sinks all the way to the end.

Here has completed initialization sorting, final array by:,5,2,7,6,4,1,9,3 [8] into,3,2,5,6,4,8,9,7 [1].

Add element (producer)

The put(E) method calls the offer(E) method. The offer(E) method does not block, and the unbounded array does not block either, so call the offer(E) method directly:Here the logic is relatively simple, first to see if the boundary is crossed, the boundary is expanded first, expansion will be discussed later. SiftUpComparable is the default sorting method for floatation:Using the sorted binary heap above, if we now add an element 4, we get the following binary heap:To ensure that the newly added element is not smaller than the root node according to the sorting rules, the newly added element needs to be floated up.

Come up for the first time

Notice that 4 is less than 6, so we put 6 at the end of the queue. Notice that 4 is not assigned to the queue because it has to float up to see where it is

Ascend the second time

The second float will find that 4 is less than 3, so it will jump out of the loop and make sure that 4 is placed at index 4 to complete the insert

Acquisition elements (consumers)

Call the take() method to get the elementLook mainly at the dequeue() method:The main logic of this approach is: 1, the first to get the first element (need to return) and the last element 2, then the last element is empty 3, good use to save the value of the last element from began to sink Last step sinking and initialize the last step of the sinking operation is the same way, until he finished sinks will give birth to a minimum elements back into the head node

capacity

Finally, let’s analyze the expanded tryGrow method

    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // Release the lock before capacity expansion (if capacity expansion may cost, release the lock first, let the queue thread can operate normally)
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0.1)) {// Use CAS to ensure that only one thread can be expanded
            try {
                int newCap = oldCap + ((oldCap < 64)? (oldCap +2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {// Overflow may occur if the current maximum capacity is greater
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)// Enlarging an element also overflows or exceeds the maximum capacity throws an exception
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;// If the capacity exceeds the maximum capacity, the system will only expand to the maximum capacity
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];// Initialize a new array based on the latest capacity
            } finally {
                allocationSpinLock = 0; }}if (newArray == null) // If the value is empty, the CAS fails and some threads are expanding the CPU capacity
            Thread.yield();
        lock.lock();// Only one thread can copy the array
        if(newArray ! =null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);// Copy the old elements into the new array}}Copy the code

DelayQueue

DelayQueue is an unbounded blocking queue that supports delayed fetching of elements. This is done using PriorityQueue. Elements in the queue must implement the Delayed interface:The interface defines a getDelay method to get the current remaining expiration time, as well as a compareTo method due to the implementation of the Comparable interface.

DelayQueue Example

1. Create a new object and implement Delayed and rewrite getDelay and compareTo

package com.zwx.concurrent.queue.block.model;

import java.sql.Time;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class MyElement implements Delayed {
    private long expireTime;// Expiration time (milliseconds)
    private int id;

    public long getExpireTime(a) {
        return expireTime;
    }

    public void setExpireTime(long expireTime) {
        this.expireTime = expireTime;
    }

    public int getId(a) {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public MyElement(int id, long expireTime) {
        this.id = id;
        this.expireTime = System.currentTimeMillis() + expireTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        // The class receives milliseconds, but the getDelay method in DelayQeue receives nanoseconds, so a unit conversion is required
        return unit.convert(expireTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        // Note that the sort must ensure that the first one to expire is the first one, otherwise it will block the next ones that are not due
        returnLong.valueOf(expireTime).compareTo(((MyElement) o).expireTime); }}Copy the code
package com.zwx.concurrent.queue.block;

import com.zwx.concurrent.queue.block.model.MyElement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;

public class DelayQueueDemo {
    public static void main(String[] args) {
        List<MyElement> list = new ArrayList<>();
        for (int i=1; i<=5; i++){ MyElement myElement =new MyElement(i,i*1000);
            list.add(myElement);
        }
        DelayQueue delayQueue = new DelayQueue(list);

        while (true) {try {
                MyElement myElement = (MyElement) delayQueue.take();
                System.out.println(myElement.getId());
            } catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code

DelayQueue class diagram

Now look at the class diagramThere are only two constructors, the first is an empty constructor and the second is a collection initialized by default.

Initialize the

Add by looping through the add(e) method, which then calls the Offer (e) method:

Add elements (consumer)

The elements of the DelayQueue are stored in its internal PriorityQueue, so the q.ffer (e) method is called. Leader represents the thread that acquired the lock. Q.peeek ()==e means that the current first element is the one just added, so the leader needs to be set to empty to wake up the queue (consumer) thread to reclaim the lock.

The q.ffer (e) method is basically the same as PriorityBlockingQueue

Acquisition elements (consumers)

The take method takes the elements in turn and blocks until the first element expires:

    public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();// If the queue is empty, it is blocked
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();// If it expires, the poll method is called to fetch the element and return it directly
                    first = null; // don't retain ref while waiting
                    if(leader ! =null)
                        available.await();// If the head node is not empty, a thread holds the lock and is waiting for the expiration date
                    else {//leader==null
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;// Set the head node to the current thread. There are threads waiting for the head node element to expire
                        try {
                            available.awaitNanos(delay);// block the specified time
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null&& q.peek() ! =null) available.signal(); lock.unlock(); }}Copy the code

Leader-follower thread model

In the leader-follower thread model, each thread has three modes:

  • Leader: Only one thread becomes the leader, such as DelayQueue. If one thread is waiting for an element to expire, the other threads will block the wait
  • Followers: they always try to compete with the leader before starting to work
  • Processing: indicates the thread being processed

The DelayQueue queue has a leader attribute: private Thread leader = null; The leader-follower thread model is used. When a thread holds the lock, sets the leader attribute, and is waiting for the element to expire, it becomes the leader, and the other threads block.

conclusion

PriorityBlockingQueue and DelayQueue are analyzed in this paper.