PriorityBlockingQueue profile

PriorityBlockingQueue is a blocking queue with a priority. Similar blocking queues are convenient for implementing production consumption models, and the most commonly used apis are Offer, poll, and take.

  • offer: Inserts data into the queue without blocking, returns Boolean indicating success or failure
  • poll: Pops the first element in the queue, non-blocking
  • poll(timeout): Blocks the maximum timeout if the queue is empty
  • take: blocks until data is ejected.

PriorityBlockingQueue composition

   // The default capacity is 11
   private static final int DEFAULT_INITIAL_CAPACITY = 11;
   /** * Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d. The element with the * lowest value is in queue[0], assuming the queue is nonempty. */
    private transient Object[] queue;
    // Number of data
    private transient int size;
    / / the comparator
    private transient Comparator<? super E> comparator;
    / / lock
    private final ReentrantLock lock = new ReentrantLock();
    / / semaphore
    private final Condition notEmpty = lock.newCondition();
    /** * Spinlock for allocation, acquired via CAS. */
    private transient volatile int allocationSpinLock;
    // Used for serialization and deserialization, usually null
    private PriorityQueue<E> q;
Copy the code

PriorityBlockingQueue uses the minimum heap (the top of the heap is the minimum) and a comparator to sort. By default, natural sort is used. As you can see, queue is transient and cannot be serialized. When serialized, data from queue will be copied to Q to fit the old PriorityBlockingQueue.

offer

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    // Lock, thread safe
    lock.lock();
    int n, cap;
    Object[] es;
    // The number of elements has reached the queue capacity
    while ((n = size) >= (cap = (es = queue).length))
        tryGrow(es, cap);
    try {
        final Comparator<? super E> cmp;
        if ((cmp = comparator) == null)
            siftUpComparable(n, e, es);// The default sort
        else
            siftUpUsingComparator(n, e, es, cmp);// Customize the sorting mode according to the CMP Settings
        size = n + 1;
        // The notification is not empty
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
Copy the code

Expansion tryGrow

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // Release the lock first
    Object[] newArray = null;
    // The CAS is successfully expanded
    if (allocationSpinLock == 0 &&
        ALLOCATIONSPINLOCK.compareAndSet(this.0.1)) {
        try {
            If the original capacity is smaller than 64, the capacity is increased to 2 oldCap + 2. If the original capacity is greater than or equal to 64, the capacity is equal to 1.5 oldCap. The maximum value is integer.max_value
            int growth = oldCap < 64 ? oldCap + 2 : oldCap >> 1;
            int newCap = ArraysSupport.newLength(oldCap, 1, growth);
            if (queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0; }}if (newArray == null) // If the CAS fails, other threads succeed in the CAS process
        Thread.yield();
    lock.lock();/ / lock
    if(newArray ! =null && queue == array) {
        // If the cas thread acquires successfully, it will end up here
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap); }}Copy the code

TryGrow is unlocked first because expansion is a time-consuming operation, so cas is used to improve performance. If the tryGrow is locked, new data cannot be passed in at this time. If the lock is acquired by the thread whose CAS failed, it spins until the thread whose CAS succeeded acquires the lock. Because tryGrow is in the loop of the Offer method, it will again determine if the capacity is sufficient.

Insert data into the minimum heap siftUpComparable

The same goes for custom Comparator inserts

private static <T> void siftUpComparable(int k, T x, Object[] es) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    //k starts with the number of inserted elements, assuming k = 3, as an array of elements [a,b,c]
    // Binary heap form:
    // a
    //b c
    while (k > 0) {
        // Find the parent node of the last element, equivalent to finding a by the coordinates of C, i.e. (2-1) >>> 1 = 0
        int parent = (k - 1) > > >1;
        Object e = es[parent];
        if (key.compareTo((T) e) >= 0)// It is larger than the parent node, i.e. the right child of the parent node, and is inserted directly into position k
            break;
        // Switch places with the parent node and continue to compare with the current parent node until it is larger than the parent node or is already root
        es[k] = e;
        k = parent;
    }
    es[k] = key;
}
Copy the code

Classical minimal heap algorithm.

poll(timeout)

Look directly at the poll(timeout) method, because this method actually contains the principles of the poll and take methods

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null && nanos > 0)// the poll method has no loop and returns null if no data is retrieved
            nanos = notEmpty.awaitNanos(nanos);// block until timeout nanos or other thread calls notempty.signal (), no timeout in take, use notempty.await ()
    } finally {
        lock.unlock();
    }
    return result;
}
Copy the code

The small top makes a dequeue

private E dequeue(a) {
    // assert lock.isHeldByCurrentThread();
    final Object[] es;
    final E result;
    // The header is queued out, and then the heap is adjusted to keep the top heap small
    if ((result = (E) ((es = queue)[0))! =null) {
        final int n;
        final E x = (E) es[(n = --size)];// Heap tail element
        es[n] = null;
        if (n > 0) {
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftDownComparable(0, x, es, n);
            else
                siftDownUsingComparator(0, x, es, n, cmp); }}return result;
}

private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
        // assert n > 0;
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // Top-down until there are no children
        while (k < half) {
            int child = (k << 1) + 1; // Start with the left child
            Object c = es[child];
            int right = child + 1;/ / right child
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
                c = es[child = right];// let c be the smaller node
            if (key.compareTo((T) c) <= 0) // The last node is compared with the smaller node. The node less than or equal to the smaller node exits the loop, and the last node is assigned to the parent node of the currently compared child node
                break;
            // Assign the smaller of the two child nodes to the parent node
            es[k] = c;
            // Go to the lower level of the subtree and compare it with the last node. Imagine that the current node is at the lower level of the subtree
            k = child;
        }
        es[k] = key;
  }
Copy the code

The removal of the small top heap is not easy to understand from the notes, so let’s draw an example

Compare it with the tail node from top to bottom, choose the subtree of the smaller node to continue the comparison, and assign the smallest node to the parent node each time. Although the last node still has this, it is not referenced because the size has been reduced by 1.

A special case

In this example we will first define a Comparable, and that Comparable will assume that all objects are the same.

Special Comparable classes:

	private static class TestComparable implements Comparable<TestComparable> {
		int value = 0;
		public TestComparable(int value) {
			this.value = value;
		}
		@Override
		public int compareTo(TestComparable o) {
			return 0; }}Copy the code

The main program:

	public static void main(String[] args) {
		PriorityBlockingQueue<TestComparable> queue = new PriorityBlockingQueue<>(10);
		for (int i = 0; i < 10; i++) {
			queue.add(new TestComparable(i));
		}
        System.out.print("[");
		for (int i = 0; i < 10; i++) {
			System.out.print(queue.poll().value);
        }
        System.out.println("]");
	}
Copy the code

The final print-out result of the above program is [0987654321]. It can be seen that the first print-out is the first inserted node, and the subsequent print-out order is reversed with the insertion order. After carefully recalling the dequeue of the small top stack, you can better understand the logic of PriorityBlocking.