PriorityBlockingQueue profile
PriorityBlockingQueue is a blocking queue with a priority. Similar blocking queues are convenient for implementing production consumption models, and the most commonly used apis are Offer, poll, and take.
offer
: Inserts data into the queue without blocking, returns Boolean indicating success or failurepoll
: Pops the first element in the queue, non-blockingpoll(timeout)
: Blocks the maximum timeout if the queue is emptytake
: blocks until data is ejected.
PriorityBlockingQueue composition
// The default capacity is 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/** * Priority queue represented as a balanced binary heap: the two * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The * priority queue is ordered by comparator, or by the elements' * natural ordering, if comparator is null: For each node n in the * heap and each descendant d of n, n <= d. The element with the * lowest value is in queue[0], assuming the queue is nonempty. */
private transient Object[] queue;
// Number of data
private transient int size;
/ / the comparator
private transient Comparator<? super E> comparator;
/ / lock
private final ReentrantLock lock = new ReentrantLock();
/ / semaphore
private final Condition notEmpty = lock.newCondition();
/** * Spinlock for allocation, acquired via CAS. */
private transient volatile int allocationSpinLock;
// Used for serialization and deserialization, usually null
private PriorityQueue<E> q;
Copy the code
PriorityBlockingQueue uses the minimum heap (the top of the heap is the minimum) and a comparator to sort. By default, natural sort is used. As you can see, queue is transient and cannot be serialized. When serialized, data from queue will be copied to Q to fit the old PriorityBlockingQueue.
offer
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// Lock, thread safe
lock.lock();
int n, cap;
Object[] es;
// The number of elements has reached the queue capacity
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
try {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);// The default sort
else
siftUpUsingComparator(n, e, es, cmp);// Customize the sorting mode according to the CMP Settings
size = n + 1;
// The notification is not empty
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
Copy the code
Expansion tryGrow
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // Release the lock first
Object[] newArray = null;
// The CAS is successfully expanded
if (allocationSpinLock == 0 &&
ALLOCATIONSPINLOCK.compareAndSet(this.0.1)) {
try {
If the original capacity is smaller than 64, the capacity is increased to 2 oldCap + 2. If the original capacity is greater than or equal to 64, the capacity is equal to 1.5 oldCap. The maximum value is integer.max_value
int growth = oldCap < 64 ? oldCap + 2 : oldCap >> 1;
int newCap = ArraysSupport.newLength(oldCap, 1, growth);
if (queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0; }}if (newArray == null) // If the CAS fails, other threads succeed in the CAS process
Thread.yield();
lock.lock();/ / lock
if(newArray ! =null && queue == array) {
// If the cas thread acquires successfully, it will end up here
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap); }}Copy the code
TryGrow is unlocked first because expansion is a time-consuming operation, so cas is used to improve performance. If the tryGrow is locked, new data cannot be passed in at this time. If the lock is acquired by the thread whose CAS failed, it spins until the thread whose CAS succeeded acquires the lock. Because tryGrow is in the loop of the Offer method, it will again determine if the capacity is sufficient.
Insert data into the minimum heap siftUpComparable
The same goes for custom Comparator inserts
private static <T> void siftUpComparable(int k, T x, Object[] es) {
Comparable<? super T> key = (Comparable<? super T>) x;
//k starts with the number of inserted elements, assuming k = 3, as an array of elements [a,b,c]
// Binary heap form:
// a
//b c
while (k > 0) {
// Find the parent node of the last element, equivalent to finding a by the coordinates of C, i.e. (2-1) >>> 1 = 0
int parent = (k - 1) > > >1;
Object e = es[parent];
if (key.compareTo((T) e) >= 0)// It is larger than the parent node, i.e. the right child of the parent node, and is inserted directly into position k
break;
// Switch places with the parent node and continue to compare with the current parent node until it is larger than the parent node or is already root
es[k] = e;
k = parent;
}
es[k] = key;
}
Copy the code
Classical minimal heap algorithm.
poll(timeout)
Look directly at the poll(timeout) method, because this method actually contains the principles of the poll and take methods
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)// the poll method has no loop and returns null if no data is retrieved
nanos = notEmpty.awaitNanos(nanos);// block until timeout nanos or other thread calls notempty.signal (), no timeout in take, use notempty.await ()
} finally {
lock.unlock();
}
return result;
}
Copy the code
The small top makes a dequeue
private E dequeue(a) {
// assert lock.isHeldByCurrentThread();
final Object[] es;
final E result;
// The header is queued out, and then the heap is adjusted to keep the top heap small
if ((result = (E) ((es = queue)[0))! =null) {
final int n;
final E x = (E) es[(n = --size)];// Heap tail element
es[n] = null;
if (n > 0) {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp); }}return result;
}
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
// assert n > 0;
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // Top-down until there are no children
while (k < half) {
int child = (k << 1) + 1; // Start with the left child
Object c = es[child];
int right = child + 1;/ / right child
if (right < n &&
((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
c = es[child = right];// let c be the smaller node
if (key.compareTo((T) c) <= 0) // The last node is compared with the smaller node. The node less than or equal to the smaller node exits the loop, and the last node is assigned to the parent node of the currently compared child node
break;
// Assign the smaller of the two child nodes to the parent node
es[k] = c;
// Go to the lower level of the subtree and compare it with the last node. Imagine that the current node is at the lower level of the subtree
k = child;
}
es[k] = key;
}
Copy the code
The removal of the small top heap is not easy to understand from the notes, so let’s draw an example
Compare it with the tail node from top to bottom, choose the subtree of the smaller node to continue the comparison, and assign the smallest node to the parent node each time. Although the last node still has this, it is not referenced because the size has been reduced by 1.
A special case
In this example we will first define a Comparable, and that Comparable will assume that all objects are the same.
Special Comparable classes:
private static class TestComparable implements Comparable<TestComparable> {
int value = 0;
public TestComparable(int value) {
this.value = value;
}
@Override
public int compareTo(TestComparable o) {
return 0; }}Copy the code
The main program:
public static void main(String[] args) {
PriorityBlockingQueue<TestComparable> queue = new PriorityBlockingQueue<>(10);
for (int i = 0; i < 10; i++) {
queue.add(new TestComparable(i));
}
System.out.print("[");
for (int i = 0; i < 10; i++) {
System.out.print(queue.poll().value);
}
System.out.println("]");
}
Copy the code
The final print-out result of the above program is [0987654321]. It can be seen that the first print-out is the first inserted node, and the subsequent print-out order is reversed with the insertion order. After carefully recalling the dequeue of the small top stack, you can better understand the logic of PriorityBlocking.