This article has participated in the third “topic writing” track of the Denver Creators Training Camp. For details, check out: Digg Project | Creators Training Camp third is ongoing, “write” to make a personal impact.
What is BlockingQueue?
BlockingQueue is a Queue, which is a thread-safe BlockingQueue interface.
A queue that also supports operations such as waiting for the queue to become non-empty while an element is retrieved and waiting for space in the queue to become available while an element is stored. The BlockingQueue method comes in four forms, with different methods for handling operations that are not immediately satisfied, but may be satisfied at some point in the future: One throws an exception, another returns a special value (null or false, depending on the operation), a third thread blocks the current thread indefinitely until the operation succeeds, and a fourth thread blocks only for the given maximum time limit before giving up. The following table summarizes these methods:
An exception is thrown | Special values | blocking | timeout | |
---|---|---|---|---|
insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
remove | remove() | poll() | take() | poll(time, unit) |
check | element() | peek() | Do not use | Do not use |
Here is an inheritance and implementation diagram:
Several common queues
- ArrayBlockingQueue Array is a bounded queue
- LinkedBlockingDeque list unbounded queue
- DelayQeque Scheduling unbounded queues based on time
- PriorityBlockingQueue The unbounded queue supported by the priority heap
Usage scenarios
- Thread pool: thread pool: thread pool
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
/ /...
}
Copy the code
- Eureka level 3 cache
- Netty
- Nacos
- RokcetMQ
ArrayBlockingQueue
A bounded blocking queue supported by arrays. The queue sorts elements in FIFO (first-in, first-out). The head of a queue is the element that stays on the queue the longest. The tail of the queue is the element that spends the least time on the queue. The new element is inserted at the end of the queue, and the queue retrieval operation retrieves the element at the head of the queue.
This is a classic “bounded buffer,” in which a fixed-size array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Trying to put an element into the full queue blocks the operation; Attempts to fetch elements from an empty queue are also blocked.
This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this order is not guaranteed. However, queues with fairness set to true grant threads access in FIFO order. Fairness typically reduces throughput, but reduces variability and avoids hunger.
The data structure
Its underlying data structure is an array form, constructed as follows:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; // Initialize the array
lock = new ReentrantLock(fair); / / create a lock
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
Copy the code
Team entry and team exit process
The process of joining and leaving the queue is shown in the figure below (the flow chart is put/take method). Its essence is to set a global Lock, which is a ReentrantLock and then restrict the boundary state through Condition, that is, Condition notification.
Usage scenarios
I usually use LinkedBlockingDeque as a buffer queue when thread pools are created.
LinkedBlockingDeque
LinkedBlockingDeque is a two-way blocking queue made up of a linked list structure that can insert and remove elements from both ends of the queue. The two-way queue has one more entry to the operation queue, reducing the contention in half when multiple threads join the queue at the same time. Compared to other blocking queues, LinkedBlockingDeque has addFirst, addLast, peekFirst, peekLast, etc. Methods ending in first are used to insert, retrieve, and remove the first element of a double-ended queue. A method ending in last that inserts, retrieves, and removes the last element of a two-ended queue. LinkedBlockingDeque is an optional size that can be set at initialization to prevent it from growing too much. If not, the default size is integer.max_value.
The data structure
The data structure is as follows. It is a double-ended unidirectional linked list.
How to use
Let’s briefly use the following test code:
public class LinkedBockingQueueTest {
public static void main(String[] args) {
BlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>(1);
// offer,poll thread safety/blocking API
blockingDeque.offer("Add the first element");
String item = blockingDeque.poll();
System.out.println("poll item:" + item);
// offer,poll thread safe/throws an exception on failure
try {
blockingDeque.put("Add a second element");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
String take = blockingDeque.take();
System.out.println("take item:" + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
// add,remove is not thread-safe
blockingDeque.add("Add a fourth element");
blockingDeque.add("Add a fifth element"); item = blockingDeque.remove(); System.out.println(item); }}Copy the code
The following output is displayed:
Poll item: add the first element take item: add the second element is the Exception in the thread "is the main" Java. Lang. An IllegalStateException: Deque full at java.util.concurrent.LinkedBlockingDeque.addLast(LinkedBlockingDeque.java:335) at java.util.concurrent.LinkedBlockingDeque.add(LinkedBlockingDeque.java:633) at cn.zhengsh.queue.LinkedBockingQueueTest.main(LinkedBockingQueueTest.java:30)Copy the code
Usage scenarios
I usually use LinkedBlockingDeque as a buffer queue when thread pools are created.
DelayQueue
DelayQeque is an unbounded blocking queue that can only fetch elements from the queue when the delay time is reached. You can set the queue element’s lifetime, removal time, unique ID, and so on.
Source code analysis
- Add method offer
public boolean offer(E e) {
/ / acquiring a lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
// Add the element to the priority queue
q.offer(e);
// Gets the priority header element equal to the current element
// Empty the leader and release the read limit
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
/ / releases the locklock.unlock(); }}Copy the code
- Queue method take, if null current thread blocks
public E take(a) throws InterruptedException {
/ / acquiring a lock
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
/ / spin
for (;;) {
// Get the priority queue head node
E first = q.peek();
// The priority queue is empty
if (first == null)
/ / blocking
available.await();
else {
// Determine whether the remaining time of the header element is less than or equal to 0
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// The priority queue is queued
return q.poll();
// The remaining time is greater than 0
// The header reference is null
first = null;
// Whether the leader thread is empty, if not, wait
if(leader ! =null)
available.await();
else {
// Set the leader thread to the current thread
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// Remaining seconds of hibernation
available.awaitNanos(delay);
} finally {
// When hibernation ends, the leader thread remains the current thread
/ / empty leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// The leader thread is empty and first is not empty
// Wake up the blocking leader to try again
if (leader == null&& q.peek() ! =null)
available.signal();
/ / unlocklock.unlock(); }}Copy the code
Usage scenarios
I rarely use it directly, but I use it a lot in the framework we use.
PriorityBlockingQueue
In this data structure, the elements are stored in order. The elements must implement the Comparable interface with the compareTo() method. When you insert data into a structure, it compares it to the data element until it finds its location.
Source code analysis
Constructor PriorityBlockingQueue(Collection
c) analysis, as shown below
* if the collection is SortedSet or PriorityBlockingQueue, keep the original element order */
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceofSortedSet<? >) {// If it is an ordered set
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
} else if (c instanceofPriorityBlockingQueue<? >) {// If it is a priority queue
PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
if(a.getClass() ! = Object[].class) a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator ! =null)) { // Check for null elements
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify) / / heap sort
heapify();
}
Copy the code
- Insert element offer method analysis
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock; / / lock
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length)) // If the queue is full, expand the queue
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null) // If the comparator is empty, the heap is adjusted in the natural order of the elements
siftUpComparable(n, e, array);
else // If the comparator is not empty, heap adjustments are made according to the comparator
siftUpUsingComparator(n, e, array, cmp);
size = n + 1; // Total number of queue elements +1
notEmpty.signal(); // Wake up a queue thread that may be waiting
} finally {
lock.unlock();
}
return true;
}
Copy the code
The siftUpComparable and Siftupusing Comparable methods are essentially the same, except that the former is a comparison based on the natural order of elements, while the latter is a comparison based on external comparators.
/** * inserts the element x into the array[k] position. * Then adjusts the heap according to the elements' natural order -- "float up" to keep the "heap" in order. */ The end result is a "small top heap"
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) > > >1; // (k-1) = (k-1)
Object e = array[parent];
if (key.compareTo((T) e) >= 0) // If the value of the inserted node is greater than that of the parent node, exit
break;
// Otherwise, swap the parent and current values
array[k] = e;
k = parent;
}
array[k] = key;
}
Copy the code
SiftUpComparable method is the action of pile “buoyancy adjustment”, can imagine the heap can be as a complete binary tree, every insert element link to the binary tree of the lower right, and then insert the elements compared with its parent node, if the parent node is big, the switching elements, until there is no parent node is greater than insert node. This ensures that the top of the heap (the root of the binary tree) is the smallest element. (Note: the above is only for “small top heap”)
- Expansion capacity tryGrow method
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // Expansion and joining/unjoining can be done at the same time, so release the global lock first
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0.1)) { // If allocationSpinLock is set to 1, capacity is being expanded
try {
// Calculates the new array size
int newCap = oldCap + ((oldCap < 64)? (oldCap +2) :
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // Overflow judgment
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap]; // Allocate a new array
} finally {
allocationSpinLock = 0; }}if (newArray == null) // Failed to expand capacity (other threads may be expanding capacity, causing allocationSpinLock competition failure)
Thread.yield();
lock.lock(); // Get the global lock (because the internal queue is being modified)
if(newArray ! =null && queue == array) {
queue = newArray; // Point to a new internal array
System.arraycopy(array, 0, newArray, 0, oldCap); }}Copy the code
- Analyze the take() method
* if the queue is empty, the thread is blocked. */
public E take(a) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // Get the global lock
E result;
try {
while ((result = dequeue()) == null) // The queue is empty
notEmpty.await(); // The thread waits in the noEmpty conditional queue
} finally {
lock.unlock();
}
return result;
}
private E dequeue(a) {
int n = size - 1; // n indicates the number of elements left after the queue
if (n < 0) // If the queue is empty, null is returned
return null;
else {
Object[] array = queue;
E result = (E) array[0]; // array[0] is the heap top node, which is removed each time the queue exits
E x = (E) array[n]; // array[n] is the last node of the heap, which is the bottom right node of the binary tree
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
returnresult; }}Copy the code
Usage scenarios
I rarely use it directly, but I use it a lot in the framework we use.
The resources
- Docs.oracle.com/javase/8/do…
- Segmentfault.com/a/119000001…