The problem
(1) How to implement PriorityBlockingQueue?
(2) Does PriorityBlockingQueue need to be expanded?
(3) How does PriorityBlockingQueue control concurrency security?
Introduction to the
PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue
Remember the PriorityQueue we talked about earlier? Click on the link to get to the source code of PriorityQueue
Remember what priority queues typically use to implement them? Please, don’t ask me heap (sort) again!
Source code analysis
The main properties
// The default capacity is 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// Maximum array size
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// Where to store elements
private transient Object[] queue;
// Number of elements
private transient int size;
/ / the comparator
private transient Comparator<? super E> comparator;
/ / reentrant lock
private final ReentrantLock lock;
// Non-null condition
private final Condition notEmpty;
// Control variable used during capacity expansion, CAS updates this value, who successfully updates who capacity expansion, other threads relinquished CPU
private transient volatile int allocationSpinLock;
// Non-blocking priority queue, not where elements are stored, only for serialization/deserialization
private PriorityQueue<E> q;
Copy the code
(1) Still use an array to use elements;
(2) Use a lock with a notEmpty condition to ensure concurrency security;
(3) Use CAS operation of one variable to control expansion;
Why is there no notFull condition?
Main construction methods
// The default capacity is 11
public PriorityBlockingQueue(a) {
this(DEFAULT_INITIAL_CAPACITY, null);
}
// Pass in the initial capacity
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
// Pass in the initial capacity and comparator
// Initialize variables
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
Copy the code
The team
Each blocking queue has four methods, and we’ll examine only one offer(E E) method here:
public boolean offer(E e) {
// The element cannot be empty
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
/ / lock
lock.lock();
int n, cap;
Object[] array;
// Determine whether the array needs to be expanded, i.e. the number of elements reaches the array capacity
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
// Select different methods depending on whether there is a comparator
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// The number of elements is increased by 1
size = n + 1;
// Wake up the notEmpty condition
notEmpty.signal();
} finally {
/ / unlock
lock.unlock();
}
return true;
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// Get the parent node
int parent = (k - 1) > > >1;
// The element value of the parent node
Object e = array[parent];
// If the key is larger than the parent node, the heap is complete
if (key.compareTo((T) e) >= 0)
break;
// Otherwise, switch the two positions and continue the next round of comparison
array[k] = e;
k = parent;
}
// Put the element where it should be
array[k] = key;
}
Copy the code
The entire operation is almost the same as PriorityQueue:
(1) lock;
(2) Determine whether expansion is needed;
(3) Add elements and heap from bottom to top;
(4) Increase the number of elements by 1 and wake up the notEmpty condition, wake up the thread fetching the element;
(5) Unlock;
capacity
private void tryGrow(Object[] array, int oldCap) {
// Release the lock first because it is inside the lock of the offer() method
// Use the allocationSpinLock variable to control the expansion process
// Prevent too many blocked threads
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// CAS updates threads whose allocationSpinLock is 1 to qualify for capacity expansion
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0.1)) {
try {
// If the old capacity is less than 64, it is doubled; if the old capacity is greater than 64, it is halved
int newCap = oldCap + ((oldCap < 64)? (oldCap +2) : // grow faster if small
(oldCap >> 1));
// Determine whether the new capacity overflows
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// Create a new array
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// Unlock
allocationSpinLock = 0; }}// Only those who enter the above condition will meet this condition
// Let other threads give up CPU
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// Lock again
lock.lock();
// Check that the new array was created successfully and the old array has not been replaced
if(newArray ! =null && queue == array) {
// The queue is assigned a new array
queue = newArray;
// Copy the old array elements into the new array
System.arraycopy(array, 0, newArray, 0, oldCap); }}Copy the code
(1) Unlock and remove the lock added in the offer() method;
(2) CAS operation of allocationSpinLock variable is used to control the expansion process;
(3) If the old capacity is less than 64, it is doubled, and if the old capacity is greater than 64, it is increased by half;
(4) Create new array;
(5) Change allocationSpinLock to 0, which is equivalent to unlocking;
(6) Other threads should give up CPU during capacity expansion;
(7) Lock again;
(8) The new array is created successfully, copy the old array elements, and return to the offer() method to continue adding elements;
Out of the team
There are also four ways to exit a blocking queue, and we’ll examine only one take() method here:
public E take(a) throws InterruptedException {
final ReentrantLock lock = this.lock;
/ / lock
lock.lockInterruptibly();
E result;
try {
// If the queue has no elements, it blocks on the notEmpty condition
// Get out of the loop
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
/ / unlock
lock.unlock();
}
// Return the element of the queue
return result;
}
private E dequeue(a) {
// The number of elements is reduced by 1
int n = size - 1;
if (n < 0)
// The array element is insufficient, return null
return null;
else {
Object[] array = queue;
// Pop up the top of the heap element
E result = (E) array[0];
// take the bottom element to the top of the heap
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// And do top-down heap
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
/ / modify the size
size = n;
// Return the element of the queue
returnresult; }}private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
// Just walk through the leaf node
while (k < half) {
// Left child node
int child = (k << 1) + 1; // assume left child is least
// Value of the left child
Object c = array[child];
// Right child node
int right = child + 1;
// Take the smallest of the left and right child nodes
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// If the key is smaller than the left and right child nodes, the heap is complete
if (key.compareTo((T) c) <= 0)
break;
// Otherwise, swap the key with the smallest of the left and right child nodes
array[k] = c;
k = child;
}
// Find the location to place the elementarray[k] = key; }}Copy the code
The process is similar to PriorityQueue:
(1) lock;
(2) Judge whether the queue is successful, if not, the notEmpty condition will be blocked;
(3) Pop out the top element of the heap, and take the bottom element to the top of the heap;
(4) Do top-down heap again;
(5) Unlock;
conclusion
(1) PriorityBlockingQueue is basically the same as PriorityQueue;
(2) PriorityBlockingQueue uses one lock + one notEmpty condition to control concurrency security;
(3) PriorityBlockingQueue uses a single variable CAS operation to control the capacity expansion of only one thread;
(4) Bottom-up stacking is used for team entry;
(5) Top-down stacking is used for team exit;
eggs
Why doesn’t PriorityBlockingQueue require a notFull condition?
Since PriorityBlockingQueue automatically expands when it runs out of space, there is no need to wait for the queue to be full, so there is no need for notFull conditions.
Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.