Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star
Definition 1.
PriorityBlockingQueue is a concurrency safety priority queue based on binary heap, ReentrantLock, Condition implementation. The main features are as follows:
- Unlimited data capacity (maximum integer.max_value – 8)
- Live in ReentrantLock to achieve concurrency safety, based on Condition to achieve thread wake-up
- The underlying data is stored on the binary heap in the array implementation. Note that there is no heap sort implemented here, but the minimum/maximum value is placed on the top node of the heap for each data change.
2. Heapify the initialization method
Starting from the last parent of the heap, place the smallest/largest value in the parent position and end at the topmost parent. Look directly at the code
/** * Establishes the heap invariant (described above) in the entire tree * assuming nothing about the order of the Elements prior to the call */ private void heapify(){/** ** heapify ** / Object = queue; int n = size; int half = (n >>> 1) -1; // 1. N is the length of the array. super E> cmp = comparator; If (CMP == null){for(int I = half; if(CMP == null){for(int I = half; i >= 0; SiftDownComparable (I, (E)array[I], array, n); siftDownComparable(I, array[I], array, n); } }else{ for(int i = half; i >= 0; i--){ siftDownUsingComparator(i, (E)array[i], array, n, cmp); }} // 4. After the heapify method, the minimum value in the entire binary heap is already placed at index=0 (note: this does not guarantee that the left subtree is less than the right subtree) // 5. To sort the binary heap, we need to exclude index=0 from the binary heap, starting at index= 1 and continuing to the last position. // The idea is to place the minimum value at the top of the array each time, and then exclude that node. Then repeat the above steps until the last element}Copy the code
This method actually compares the child nodes from the last parent, putting the minimum/maximum value in the parent position, and then up to the top parent we see that there is a siftDownComparable method in the code, which is an important step in implementing heapification
/** * Inserts item x at position k, maintaining heap invariant by * demoting x down the tree repeatedly until it is less than or * equal to its children or is a leaf * * @param k the position to fill * @param x the item to insert * @param array the heap array * @param n the heap array * @param <T> */ private static <T> void siftDownComparable(int k, T x, Object[] array, Int n){/** ** 1. Example: Array contains 10 elements: 1,2,3,4,5,6,7,8,9,10 * half = n >>> 1 = 10 >>> 1 = 5; The index of the largest parent in the heap is: (9-1) >>> 1 = 4; * While (k < half) * 2, while(k < half) * 2. Start at k and move down to the parent position until k >= half * 3. */ if(n > 0){Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; While (k < half){// 2. Int child = (k << 1) + 1; int child = (k << 1) + 1; C = array[child]; Int right = child + 1; If (right < n && // 6. This if judgment is actually to determine the size of the left and right child nodes, and find the minimum value, and assign it to C; ((Comparable<? super T>)c).compareTo((T)array[right]) > 0 ){ c = array[child = right]; } if(key.compareTo((T)c) <= 0){if(key.compareTo((T)c) <= 0){ } array[k] = c; K = child; k = child; k = child; If (k >= half} array[k] = key; if (k >= half} array[k] = key; // 10. Assign the key to the parent of the last siftDown comparison}}Copy the code
Operation idea:
1. Get the middle value of the array (greater than or equal to it means there are no children). Example: Array contains elements: Half = n >>> 1 = 10 >>> 1 = 5; The index of the largest parent is: (9-1) >>> 1 = 4; While (k < half) 2. When the parent is adjusted, k in the following code becomes 9/10, but 9/10 > 5. Start at k and move down to parent until k >= half 3. After this comparison, k down is a binary tree on the parentCopy the code
3. Add element Offer method
The main idea: place the added element at the end of the array, and then call siftUp to adjust it up
/** * Inserts the specified element into this priority queue * As the queue is unbounded, his method will never return {@code false} * * @param e the lement to add * @return {@code true} (as specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering) * @throws NullPointerException if the specified element is null */ @Override public boolean offer(E e) { if(e ! = null){ throw new NullPointerException(); } final ReentrantLock lock = this.lock; // 1. Obtain the global shared lock lock.lock(); int n, cap; Object[] array; While ((n = size) >= (cap = (array = queue).length)){tryGrow(array, cap); } try{Comparator<? super E> cmp = comparator; SiftUpComparable (n, e, array) {if(CMP == null){// siftUp (n, e, array) }else{ siftUpUsingComparator(n, e, array, cmp); } size = n + 1; // 5. After data is inserted, the total capacity value is + 1; notEmpty.signal(); Condition signals other waiting threads that there are elements in the container}finally {lock.unlock(); } return true; }Copy the code
In this code, you can see the tryGrow method, which adjusts the heap storage space by releasing lock. Unlock and allocationSpinLock to determine if another thread is expanding
/** * Tries to grow array to accommodate at least one more element * (but normally expend by about 50%), giving up (allowing retry) * on contention (which we expect to be race). Call only this while * holding lock * * @param array the heap array * @param oldCap the length of the array */ private void tryGrow(Object[] array, Int oldCap){/** ** the whole method is executed when ReentrantLock has acquired the lock */ lock.unlock(); // must release and then re-acquire main lock // 1. Release global locks (why? De) Object[] newArray = null; de) Object[] newArray = null; if(allocationSpinLock == 0 && // 2. In CAS operation, realized in allocationSpinLock optimistic locking, this also is in order to not affect the other container when expanding concurrent operation pareAndSwapInt unsafe.com (this, allocationSpinLockOffset, 0, 1)){ try{ int newCap = oldCap + ((oldCap < 64)? // 3. If the capacity is less than 64, directly double + 2; If (oldCap + 2): // grow faster if small (oldCap >> 1)); if(newCap - MAX_ARRAY_SIZE > 0){ // possible overflow int minCap = oldCap + 1; / / 4. Expansion after more than maximum capacity to deal with the if (minCap < 0 | | minCap > MAX_ARRAY_SIZE) {throw new OutOfMemoryError (); } newCap = MAX_ARRAY_SIZE; } if(newCap > oldCap && queue == array){// 5. Queue == array newArray = new Object[newCap]; } }finally { allocationSpinLock = 0; } // 6. NewArray == null If (newArray == null){// back off if another thread is allocating thread.yield (); // allocating thread.yield (); // allocating thread.yield (); } lock.lock(); // 7. Allocate CPU scheduling (because other threads must perform other operations after capacity expansion)} lock.lock(); // 8. Reacquire lock if(newArray! = null && queue == array){ // 9. Queue = newArray; System. arrayCopy (array, 0, newArray, 0, oldCap); }}Copy the code
We also call the siftUpComparable method idea when we do offer elements: compare the elements to parent until parent >=
/** * Insert item x at position k, maintaining heap invariant by * promoting x up the tree until it is greater than or equal to * its parent, or is the root * * To simplify and speed up coercions and comparisons. the * Comparable and Comparator versions are separated into different * method that are otherwise identical. (Similarly for siftDown) * These methods are statics, with heap state as arguments, to * simplify use in light og possible comparator exceptions * * @param k the position to fill * @param x the item to insert * @param array the heap array * @param <T> */ private static <T> void siftUpComparable(int k, T x, Object[] array){/** * siftUp: place the element x in position K and compare the parent of k until k>=parent */ Comparable<? super T> key = (Comparable<? super T>)x; Int parent = (k - 1) >>> 1; int parent = (k - 1) >>> 1; Object e = array[parent]; If (key.compareTo((T)e) >= 0){// 4. Key >= e } array[k] = e; // add k = parent; // add k = parent; } array[k] = key;} array[k] = key; // 7. Place the value key in the appropriate position}Copy the code
4. Delete the element poll method
SiftDown siftDown siftDown siftDown siftDown siftDown siftDown siftDown siftDown siftDown siftDown siftDown siftDown
@Override public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } private E dequeue(){ int n = size - 1; If (n < 0){// 1. } else{Object[] array = queue; E result = (E)array[0]; X = (E)array[n]; Array [n] = null; Comparator<? super E> cmp = comparator; SiftDownComparable (0, x, array, n); siftDownComparable(0, x, array, n); siftDownComparable(0, x, array, n); }else{ siftDownUsingComparator(0, x, array, n, cmp); } size = n; // 6. Reassign size to return result; // return the value}}Copy the code
Remove the element remove method
Ideas:
-
Finds the position in the array of the element to be deleted
-
Remove the element to be deleted, place the last element in the array to this position, siftDown to find the appropriate place, and then siftUp to find the appropriate place
/ * *
- Remove a single instance of the specified element from this queue,
- if it is present. More formally, removes an element {@code e} such
- that {@code o.equal(e)}, if this queue contains one or more such
- elements. Returns {@code true} if and onlu if this queue contained
- the specified element (or equivalently, if this queue changed as
- a result of the call)
- @param o element to removed from this queue, if present
- @return {@code true} if this queue changed as a result of the call
/ public Boolean remove(Object o){/* * final ReentrantLock lock = this.lock; lock.lock(); try{ int i = indexOf(o); If (I == -1){return false; } removeAt(i); // 2. Call removeAt to remove the element. }finally { lock.unlock(); }}
/ * *
- Removes the ith element from queue
- @param i
Private void removeAt(int I){private void removeAt(int I){private void removeAt(int I){private void removeAt(int I){private void removeAt(int I){ int n = size – 1; if(n == i){ // remove last lement // 1. Array [I] = null. }else{ E moved = (E)array[n]; Array [n] = null; // 3. Null element Comparator
cmp = comparator; if(cmp == null){ siftDownComparable(i, moved, array, n); SiftDownUsingComparator (I, moved, array, n, CMP); siftDownUsingComparator(I, moved, array, n, CMP); } if(array[I] == moved){// 5. Array [I] = moved If (CMP == null){siftUpComparable(I, moved, array); }else{ siftUpUsingComparator(i, moved, array, cmp); } } } size = n; // 6. Perform size reassignment}
PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue PriorityBlockingQueue
Reference:
PriorityBlockingQueue source code analysis (based on Java 8) Binary heap sort VickyQi PriorityBlockingQueue