This article has participated in the third “topic writing” track of the Denver Creators Training Camp. For details, check out: Digg Project | Creators Training Camp third is ongoing, “write” to make a personal impact.

What is BlockingQueue?

BlockingQueue is a Queue, which is a thread-safe BlockingQueue interface.

A queue that also supports operations such as waiting for the queue to become non-empty while an element is retrieved and waiting for space in the queue to become available while an element is stored. The BlockingQueue method comes in four forms, with different methods for handling operations that are not immediately satisfied, but may be satisfied at some point in the future: One throws an exception, another returns a special value (null or false, depending on the operation), a third thread blocks the current thread indefinitely until the operation succeeds, and a fourth thread blocks only for the given maximum time limit before giving up. The following table summarizes these methods:

An exception is thrown Special values blocking timeout
insert add(e) offer(e) put(e) offer(e, time, unit)
remove remove() poll() take() poll(time, unit)
check element() peek() Do not use Do not use

Here is an inheritance and implementation diagram:

Several common queues

  1. ArrayBlockingQueue Array is a bounded queue
  2. LinkedBlockingDeque list unbounded queue
  3. DelayQeque Scheduling unbounded queues based on time
  4. PriorityBlockingQueue The unbounded queue supported by the priority heap

Usage scenarios

  1. Thread pool: thread pool: thread pool
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    / /...
}
Copy the code
  1. Eureka level 3 cache
  2. Netty
  3. Nacos
  4. RokcetMQ

ArrayBlockingQueue

A bounded blocking queue supported by arrays. The queue sorts elements in FIFO (first-in, first-out). The head of a queue is the element that stays on the queue the longest. The tail of the queue is the element that spends the least time on the queue. The new element is inserted at the end of the queue, and the queue retrieval operation retrieves the element at the head of the queue.

This is a classic “bounded buffer,” in which a fixed-size array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Trying to put an element into the full queue blocks the operation; Attempts to fetch elements from an empty queue are also blocked.

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this order is not guaranteed. However, queues with fairness set to true grant threads access in FIFO order. Fairness typically reduces throughput, but reduces variability and avoids hunger.

The data structure

Its underlying data structure is an array form, constructed as follows:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity]; // Initialize the array
    lock = new ReentrantLock(fair);    / / create a lock
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
Copy the code

Team entry and team exit process

The process of joining and leaving the queue is shown in the figure below (the flow chart is put/take method). Its essence is to set a global Lock, which is a ReentrantLock and then restrict the boundary state through Condition, that is, Condition notification.

Usage scenarios

I usually use LinkedBlockingDeque as a buffer queue when thread pools are created.

LinkedBlockingDeque

LinkedBlockingDeque is a two-way blocking queue made up of a linked list structure that can insert and remove elements from both ends of the queue. The two-way queue has one more entry to the operation queue, reducing the contention in half when multiple threads join the queue at the same time. Compared to other blocking queues, LinkedBlockingDeque has addFirst, addLast, peekFirst, peekLast, etc. Methods ending in first are used to insert, retrieve, and remove the first element of a double-ended queue. A method ending in last that inserts, retrieves, and removes the last element of a two-ended queue. LinkedBlockingDeque is an optional size that can be set at initialization to prevent it from growing too much. If not, the default size is integer.max_value.

The data structure

The data structure is as follows. It is a double-ended unidirectional linked list.

How to use

Let’s briefly use the following test code:

public class LinkedBockingQueueTest {

    public static void main(String[] args) {
        BlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>(1);
        // offer,poll thread safety/blocking API
        blockingDeque.offer("Add the first element");
        String item = blockingDeque.poll();
        System.out.println("poll item:" + item);

        // offer,poll thread safe/throws an exception on failure
        try {
            blockingDeque.put("Add a second element");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            String take = blockingDeque.take();
            System.out.println("take item:" + take);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // add,remove is not thread-safe
        blockingDeque.add("Add a fourth element");
        blockingDeque.add("Add a fifth element"); item = blockingDeque.remove(); System.out.println(item); }}Copy the code

The following output is displayed:

Poll item: add the first element take item: add the second element is the Exception in the thread "is the main" Java. Lang. An IllegalStateException: Deque full at java.util.concurrent.LinkedBlockingDeque.addLast(LinkedBlockingDeque.java:335) at java.util.concurrent.LinkedBlockingDeque.add(LinkedBlockingDeque.java:633) at cn.zhengsh.queue.LinkedBockingQueueTest.main(LinkedBockingQueueTest.java:30)Copy the code

Usage scenarios

I usually use LinkedBlockingDeque as a buffer queue when thread pools are created.

DelayQueue

DelayQeque is an unbounded blocking queue that can only fetch elements from the queue when the delay time is reached. You can set the queue element’s lifetime, removal time, unique ID, and so on.

Source code analysis

  • Add method offer
public boolean offer(E e) {
    / / acquiring a lock
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // Add the element to the priority queue
        q.offer(e);
        // Gets the priority header element equal to the current element
        // Empty the leader and release the read limit
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        / / releases the locklock.unlock(); }}Copy the code
  • Queue method take, if null current thread blocks
public E take(a) throws InterruptedException {
    / / acquiring a lock
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        / / spin
        for (;;) {
            // Get the priority queue head node
            E first = q.peek();
            // The priority queue is empty
            if (first == null)
                / / blocking
                available.await();
            else {
                // Determine whether the remaining time of the header element is less than or equal to 0
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // The priority queue is queued
                    return q.poll();
                // The remaining time is greater than 0
                // The header reference is null
                first = null;
                // Whether the leader thread is empty, if not, wait
                if(leader ! =null)
                    available.await();
                else {
                    // Set the leader thread to the current thread
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // Remaining seconds of hibernation
                        available.awaitNanos(delay);
                    } finally {
                        // When hibernation ends, the leader thread remains the current thread
                        / / empty leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // The leader thread is empty and first is not empty
        // Wake up the blocking leader to try again
        if (leader == null&& q.peek() ! =null)
            available.signal();
        / / unlocklock.unlock(); }}Copy the code

Usage scenarios

I rarely use it directly, but I use it a lot in the framework we use.

PriorityBlockingQueue

In this data structure, the elements are stored in order. The elements must implement the Comparable interface with the compareTo() method. When you insert data into a structure, it compares it to the data element until it finds its location.

Source code analysis

Constructor PriorityBlockingQueue(Collection
c) analysis, as shown below

* if the collection is SortedSet or PriorityBlockingQueue, keep the original element order */
public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true;     // true if not known to be in heap order
    boolean screen = true;      // true if must screen for nulls
 
    if (c instanceofSortedSet<? >) {// If it is an ordered set
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    } else if (c instanceofPriorityBlockingQueue<? >) {// If it is a priority queue
        PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class)   // exact match
            heapify = false;
    }
 
    Object[] a = c.toArray();
    int n = a.length;
    if(a.getClass() ! = Object[].class) a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator ! =null)) {    // Check for null elements
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)    / / heap sort
        heapify();
}
Copy the code
  • Insert element offer method analysis
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
 
    final ReentrantLock lock = this.lock;   / / lock
    lock.lock();
 
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))    // If the queue is full, expand the queue
        tryGrow(array, cap);
 
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)    // If the comparator is empty, the heap is adjusted in the natural order of the elements
            siftUpComparable(n, e, array);
        else                // If the comparator is not empty, heap adjustments are made according to the comparator
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;       // Total number of queue elements +1
        notEmpty.signal();  // Wake up a queue thread that may be waiting
    } finally {
        lock.unlock();
    }
    return true;
}
Copy the code

The siftUpComparable and Siftupusing Comparable methods are essentially the same, except that the former is a comparison based on the natural order of elements, while the latter is a comparison based on external comparators.

/** * inserts the element x into the array[k] position. * Then adjusts the heap according to the elements' natural order -- "float up" to keep the "heap" in order. */ The end result is a "small top heap"
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) > > >1;     // (k-1) = (k-1)
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)  // If the value of the inserted node is greater than that of the parent node, exit
            break;
 
        // Otherwise, swap the parent and current values
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}
Copy the code

SiftUpComparable method is the action of pile “buoyancy adjustment”, can imagine the heap can be as a complete binary tree, every insert element link to the binary tree of the lower right, and then insert the elements compared with its parent node, if the parent node is big, the switching elements, until there is no parent node is greater than insert node. This ensures that the top of the heap (the root of the binary tree) is the smallest element. (Note: the above is only for “small top heap”)

  • Expansion capacity tryGrow method
private void tryGrow(Object[] array, int oldCap) {
    lock.unlock();  // Expansion and joining/unjoining can be done at the same time, so release the global lock first
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                    0.1)) {    // If allocationSpinLock is set to 1, capacity is being expanded
        try {
            // Calculates the new array size
            int newCap = oldCap + ((oldCap < 64)? (oldCap +2) :
                    (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // Overflow judgment
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];  // Allocate a new array
        } finally {
            allocationSpinLock = 0; }}if (newArray == null)   // Failed to expand capacity (other threads may be expanding capacity, causing allocationSpinLock competition failure)
        Thread.yield();
    
    lock.lock();            // Get the global lock (because the internal queue is being modified)
    if(newArray ! =null && queue == array) {
        queue = newArray;   // Point to a new internal array
        System.arraycopy(array, 0, newArray, 0, oldCap); }}Copy the code
  • Analyze the take() method
* if the queue is empty, the thread is blocked. */
public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();   // Get the global lock
    E result;
    try {
        while ((result = dequeue()) == null)    // The queue is empty
            notEmpty.await();                   // The thread waits in the noEmpty conditional queue
    } finally {
        lock.unlock();
    }
    return result;
}
 
private E dequeue(a) {
    int n = size - 1;   // n indicates the number of elements left after the queue
    if (n < 0)          // If the queue is empty, null is returned
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];    // array[0] is the heap top node, which is removed each time the queue exits
        E x = (E) array[n];         // array[n] is the last node of the heap, which is the bottom right node of the binary tree
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        returnresult; }}Copy the code

Usage scenarios

I rarely use it directly, but I use it a lot in the framework we use.

The resources

  • Docs.oracle.com/javase/8/do…
  • Segmentfault.com/a/119000001…