The problem

(1) How to implement PriorityBlockingQueue?

(2) Does PriorityBlockingQueue need to be expanded?

(3) How does PriorityBlockingQueue control concurrency security?

Introduction to the

PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue

Remember the PriorityQueue we talked about earlier? Click on the link to get to the source code of PriorityQueue

Remember what priority queues typically use to implement them? Please, don’t ask me heap (sort) again!

Source code analysis

The main properties

// The default capacity is 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// Maximum array size
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// Where to store elements
private transient Object[] queue;
// Number of elements
private transient int size;
/ / the comparator
private transient Comparator<? super E> comparator;
/ / reentrant lock
private final ReentrantLock lock;
// Non-null condition
private final Condition notEmpty;
// Control variable used during capacity expansion, CAS updates this value, who successfully updates who capacity expansion, other threads relinquished CPU
private transient volatile int allocationSpinLock;
// Non-blocking priority queue, not where elements are stored, only for serialization/deserialization
private PriorityQueue<E> q;
Copy the code

(1) Still use an array to use elements;

(2) Use a lock with a notEmpty condition to ensure concurrency security;

(3) Use CAS operation of one variable to control expansion;

Why is there no notFull condition?

Main construction methods

// The default capacity is 11
public PriorityBlockingQueue(a) {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
// Pass in the initial capacity
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
// Pass in the initial capacity and comparator
// Initialize variables
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}
Copy the code

The team

Each blocking queue has four methods, and we’ll examine only one offer(E E) method here:


public boolean offer(E e) {
    // The element cannot be empty
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    / / lock
    lock.lock();
    int n, cap;
    Object[] array;
    // Determine whether the array needs to be expanded, i.e. the number of elements reaches the array capacity
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        // Select different methods depending on whether there is a comparator
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        // The number of elements is increased by 1
        size = n + 1;
        // Wake up the notEmpty condition
        notEmpty.signal();
    } finally {
        / / unlock
        lock.unlock();
    }
    return true;
}

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // Get the parent node
        int parent = (k - 1) > > >1;
        // The element value of the parent node
        Object e = array[parent];
        // If the key is larger than the parent node, the heap is complete
        if (key.compareTo((T) e) >= 0)
            break;
        // Otherwise, switch the two positions and continue the next round of comparison
        array[k] = e;
        k = parent;
    }
    // Put the element where it should be
    array[k] = key;
}
Copy the code

The entire operation is almost the same as PriorityQueue:

(1) lock;

(2) Determine whether expansion is needed;

(3) Add elements and heap from bottom to top;

(4) Increase the number of elements by 1 and wake up the notEmpty condition, wake up the thread fetching the element;

(5) Unlock;

capacity

private void tryGrow(Object[] array, int oldCap) {
    // Release the lock first because it is inside the lock of the offer() method
    // Use the allocationSpinLock variable to control the expansion process
    // Prevent too many blocked threads
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // CAS updates threads whose allocationSpinLock is 1 to qualify for capacity expansion
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0.1)) {
        try {
            // If the old capacity is less than 64, it is doubled; if the old capacity is greater than 64, it is halved
            int newCap = oldCap + ((oldCap < 64)? (oldCap +2) : // grow faster if small
                                   (oldCap >> 1));
            // Determine whether the new capacity overflows
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // Create a new array
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // Unlock
            allocationSpinLock = 0; }}// Only those who enter the above condition will meet this condition
    // Let other threads give up CPU
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // Lock again
    lock.lock();
    // Check that the new array was created successfully and the old array has not been replaced
    if(newArray ! =null && queue == array) {
        // The queue is assigned a new array
        queue = newArray;
        // Copy the old array elements into the new array
        System.arraycopy(array, 0, newArray, 0, oldCap); }}Copy the code

(1) Unlock and remove the lock added in the offer() method;

(2) CAS operation of allocationSpinLock variable is used to control the expansion process;

(3) If the old capacity is less than 64, it is doubled, and if the old capacity is greater than 64, it is increased by half;

(4) Create new array;

(5) Change allocationSpinLock to 0, which is equivalent to unlocking;

(6) Other threads should give up CPU during capacity expansion;

(7) Lock again;

(8) The new array is created successfully, copy the old array elements, and return to the offer() method to continue adding elements;

Out of the team

There are also four ways to exit a blocking queue, and we’ll examine only one take() method here:

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    / / lock
    lock.lockInterruptibly();
    E result;
    try {
        // If the queue has no elements, it blocks on the notEmpty condition
        // Get out of the loop
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        / / unlock
        lock.unlock();
    }
    // Return the element of the queue
    return result;
}

private E dequeue(a) {
    // The number of elements is reduced by 1
    int n = size - 1;
    if (n < 0)
        // The array element is insufficient, return null
        return null;
    else {
        Object[] array = queue;
        // Pop up the top of the heap element
        E result = (E) array[0];
        // take the bottom element to the top of the heap
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // And do top-down heap
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        / / modify the size
        size = n;
        // Return the element of the queue
        returnresult; }}private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        // Just walk through the leaf node
        while (k < half) {
            // Left child node
            int child = (k << 1) + 1; // assume left child is least
            // Value of the left child
            Object c = array[child];
            // Right child node
            int right = child + 1;
            // Take the smallest of the left and right child nodes
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            // If the key is smaller than the left and right child nodes, the heap is complete
            if (key.compareTo((T) c) <= 0)
                break;
            // Otherwise, swap the key with the smallest of the left and right child nodes
            array[k] = c;
            k = child;
        }
        // Find the location to place the elementarray[k] = key; }}Copy the code

The process is similar to PriorityQueue:

(1) lock;

(2) Judge whether the queue is successful, if not, the notEmpty condition will be blocked;

(3) Pop out the top element of the heap, and take the bottom element to the top of the heap;

(4) Do top-down heap again;

(5) Unlock;

conclusion

(1) PriorityBlockingQueue is basically the same as PriorityQueue;

(2) PriorityBlockingQueue uses one lock + one notEmpty condition to control concurrency security;

(3) PriorityBlockingQueue uses a single variable CAS operation to control the capacity expansion of only one thread;

(4) Bottom-up stacking is used for team entry;

(5) Top-down stacking is used for team exit;

eggs

Why doesn’t PriorityBlockingQueue require a notFull condition?

Since PriorityBlockingQueue automatically expands when it runs out of space, there is no need to wait for the queue to be full, so there is no need for notFull conditions.


Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.