This article has participated in the third “topic writing” track of the Denver Creators Training Camp. For details, check out: Digg Project | Creators Training Camp third is ongoing, “write” to make a personal impact.
What is PriorityBlockingQueue?
PriorityBlockingQueue is an unbounded queue that supports priority blocking until system resources are exhausted. By default, elements are arranged in natural ascending order. You can also customize the class to implement the compareTo() method to specify the element collation rules, or when initializing PriorityBlockingQueue, specify the constructor parameter Comparator to sort the elements. However, it is important to note that the order of elements of the same priority cannot be guaranteed. PriorityBlockingQueue is also based on the least binary heap implementation, which uses the ** spin lock ** based on the CAS implementation to control the dynamic queue expansion, ensuring that the expansion operation does not block the execution of the take operation.
Binary heap
A complete binary tree, which is very suitable for array storage, it has the following two characteristics:
- For the element a[I] in the array, the left child is a[2I +1], and the right child node is a[2I + 2, its parent is a[(I -1)/2].
- The heap order property is that the value of each node is less than the value of its left and right child nodes. The smallest value in the binary heap is the root node, but removing the root node is cumbersome because you need to adjust the tree.
An intermediate binary heap is shown here (minimum binary heap) :Back to our topic PriorityBlockingQueue, we will describe its implementation and principle from its initialization process, queue entry, queue exit, expansion and other aspects.
Initialization process
Member variables
PriortyBlockingQueue member variable resolution:
// Initial capacity 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// Maximum capacity
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// An array of queue elements
private transient Object[] queue;
// The number of elements in the queue
private transient int size;
/ / the comparator
private transient Comparator<? super E> comparator;
// Lock for all common operations
private final ReentrantLock lock;
// Not null condition
private final Condition notEmpty;
// The splock used for allocation, obtained by CAS, ensures that only one thread can expand.
private transient volatile int allocationSpinLock;
// Normal priority queue for serialization only
// Maintain compatibility with previous versions
private PriorityQueue<E> q;
Copy the code
A constructor
PriorityBlockingQueue(Collection
c
public PriorityBlockingQueue(Collection<? extends E> c) {
// Initialize the global lock
this.lock = new ReentrantLock();
// Non-null condition initialization
this.notEmpty = lock.newCondition();
// If you do not know the heap order is true
boolean heapify = true; // true if not known to be in heap order
// True if the element is not allowed to be null
boolean screen = true; // true if must screen for nulls
if (c instanceofSortedSet<? >) {This.parator = this.parator = this.parator = this.parator
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceofPriorityBlockingQueue<? >) {// If it is of type PriorityBlockingQueue, initialize the comparator and set it to allow null
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
// Sets are converted to arrays
Object[] a = c.toArray();
// n indicates the actual data length
int n = a.length;
// If it is ArrayList
if(c.getClass() ! = java.util.ArrayList.class) a = Arrays.copyOf(a, n, Object[].class);// If the specified collection does not belong to SortedSet or subclass or to PriorityBlockingQueue or subclass
// And n is 1 or the comparator is not null
// Check for null
if (screen && (n == 1 || this.comparator ! =null)) {
/ / check the null
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
// Assign data to this.queue
this.queue = a;
// The queue length is n
this.size = n;
/ / heap sort
if (heapify)
heapify();
}
Copy the code
Heapify heap sort
The whole logic of the Heapify () method is a heapsort process that sorts the elements between 0 and (n/2-1) of the array. The core logic of the whole sorting is that the parent node and the left and right nodes are compared, and the smallest element of the three nodes floats up. This process starts at the end of (n/2-1) to sort the top elements, so we can think of it as keeping the bottom elements in order and then moving up to the top.
private void heapify(a) {
/ / data
Object[] array = queue;
// Data length
int n = size;
// Maximum index of the first half
int half = (n >>> 1) - 1;
/ / the comparator
Comparator<? super E> cmp = comparator;
if (cmp == null) { // The comparator is empty
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);
}
else { // The comparator is not empty
for (int i = half; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); }}private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break; array[k] = c; k = child; } array[k] = key; }}Copy the code
The team
The add () and offer ()
Add (E E) and Offer (E E) have the same purpose. They both insert elements into the priority Queue, but the Queue interface specifies that the add(E E) and offer(E E) handle the insert failure differently. There is no difference between these two methods for PriorityQueue and we can use them in different scenarios. Let’s take offer as an example and see how it works.
public boolean offer(E e) {
// If null, NPE is returned. So null elements cannot be added
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
/ / acquiring a lock
lock.lock();
int n, cap;
Object[] array;
// When the queue is full, extend the capacity
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
// The comparator is empty
siftUpComparable(n, e, array);
else
// Add the element to the heap
siftUpUsingComparator(n, e, array, cmp);
// Element format increment 1
size = n + 1;
// Wake up the reader thread
notEmpty.signal();
} finally {
/ / unlock
lock.unlock();
}
return true;
}
Copy the code
TryGrow extension capacity
According to the analysis above, if the queue is full, the thread writing to the queue needs to call the tryGrow method to expand the queue. Add new elements to the queue after successful expansion. Let’s look at an implementation of the tryGrow method:
private void tryGrow(Object[] array, int oldCap) {
/ / releases the lock
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// CAS obtains the lock
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0.1)) {
/ / extension capacity
try {
// Compute the new length
int newCap = oldCap + ((oldCap < 64)? (oldCap +2) : // grow faster if small
(oldCap >> 1));
// If the new capacity is larger than MAX_ARRAY_SIZE, memory may overflow
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
// Failed to expand capacity after reaching the maximum value
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// If a thread has been expanded by another thread, it will not be expanded this time
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
/ / releases the lock
allocationSpinLock = 0; }}// If a writer thread is expanding capacity to give up CPU
if (newArray == null) // back off if another thread is allocating
Thread.yield();
/ / unlock
lock.lock();
If so, block the queue, pointing to the new array and referencing the new queue
if(newArray ! =null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap); }}Copy the code
SiftUpComparable adds elements to the heap in a way that distinguishes between natural sorting and comparator sorting
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) > > >1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) > > >1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
Copy the code
Out of the team
The queue is usually called take, or poll, remove method, the following is the implementation of take method.
public E take(a) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// If you do not get await
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
/ / return
return result;
}
Copy the code
As you can see from above, the core method for getting out of a queue is dequeue().
private E dequeue(a) {
int n = size - 1;
/ / is empty
if (n < 0)
return null;
else {
// Fetch data from non-empty heap top
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
/ / adjust the heap
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
returnresult; }}Copy the code
conclusion
PriorityBlockingQueue does not block the writer thread. When the queue is full, the writer thread attempts to expand the queue, which blocks the queue. When the queue element is empty, the reader thread will be blocked. Of course, there is also a poll of non-blocking methods. This blocking queue is suitable for scenarios where there are too many reads and too few writes. Performance impact, the queue is heap-stored, so every time the element is fetched from the blocking queue it is always the smallest (or largest) element. And the heap store needs to provide either a comparator or an element that implements the comparator interface, otherwise the program will throw a ClassCastException.
The resources
- Blog.csdn.net/weixin_4376…
- www.pianshen.com/article/633…
- visualgo.net/zh/heap
- www.cnblogs.com/Elliott-Su-…