Today we introduce the most popular synchronous class in Java concurrent programming — BlockingQueue, the flower of concurrency.

JDK version: Oracle Java 1.8.0_102

Before you continue, make sure you are familiar with the use of locks and conditional queues, especially conditional queues, otherwise you may not understand the subtleties and even basic correctness of the source code below. This part is not covered in this article, readers need to prepare their own.

The interface definition

BlockingQueue extends from Queue, adding features such as joining and leaving blocks:

public interface BlockingQueue<E> extends Queue<E> {
  boolean add(E e);

  void put(E e) throws InterruptedException;

  // can extends from Queue. i don't know why overriding here
  boolean offer(E e);

  boolean offer(E e, long timeout, TimeUnit unit)
      throws InterruptedException;

  E take(a) throws InterruptedException;

  // extends from Queue
  // E poll();

  E poll(long timeout, TimeUnit unit)
      throws InterruptedException;

  int remainingCapacity(a);

  boolean remove(Object o);

  public boolean contains(Object o);

  int drainTo(Collection<? super E> c);

  int drainTo(Collection<? super E> c, int maxElements);
}Copy the code

I’ve adjusted the order of some of the methods for ease of explanation, and added comments to help explain them.

There are two pairs of approaches to focus on:

  • Blocking methods BlockingQueue#put() and BlockingQueue#take() : if joining (or leaving) fails (if the queue is full, wait until the conditions for joining are met and joining is successful).
  • Non-blocking methods BlockingQueue#offer() and BlockingQueue#poll(), and their timeout versions: the non-timeout version is an instantaneous action that returns failure immediately if the current queue fails to join; The timeout version can block for a period of time on top of this, equivalent to BlockingQueue#put() and BlockingQueue#take() for a limited time.

The implementation class

BlockingQueue has a number of implementation classes. The most common ones, according to Github’s Code Results rankings, are LinkedBlockingQueue(253K) and ArrayBlockingQueue (95K). LinkedBlockingQueue performs better than ArrayBlockingQueue in most cases, and this article focuses on LinkedBlockingQueue, with a brief comparison at the end.

LinkedBlockingQueue

Blocking methods put() and take()

The two blocking methods are relatively simple and help you understand the core idea of LinkedBlockingQueue: there is a lock at the head of the queue and a lock at the end of the queue, and there is no contest between joining and leaving the queue.

The previous implementation of BlockingQueue#put() and BlockingQueue#take() in the Java implementation producer-consumer model was introduced step by step. Take a refresher to understand why LinkedBlockingQueue is designed this way. Here is a more detailed explanation.

Blocked enqueue operation PUT ()

Join up at the back of the line. PutLock and notFull work together to complete the synchronization.

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}Copy the code

Now trigger a team action, discussed separately.

Case1: Before joining the queue, the queue is not empty or full (length is greater than or equal to 2).

Before joining the team, you need to get the lock putLock. Check that the queue is notFull and join the queue without waiting for condition notFull. After enqueueing, the queue checks that it is notFull, and a random producer is notified that the notFull condition is met. Finally, check that the queue is notEmpty before enqueueing, and no notification condition notEmpty is required.

Note:

  • Before joining the queue, the queue is either empty or full (the length is greater than or equal to 2). Then, the nodes pointed by head and tail are different, and the joining and leaving operations do not update the same node at the same time, so there is no competition. Therefore, simultaneous enqueue and dequeue operations with two separate locks are thread-safe. Further, since enqueue entry is already protected by the lock putLock, the internal implementation of enqueue does not need to lock.
  • The condition notFull can randomly notify only one producer thread waiting for the condition (using signal() instead of signalAll()). namely"Single Notice"The aim is to reduce ineffective competition.But this does not create the problem of “signal hijacking” because only the producer is waiting for the condition.
  • The conditional notification method singal() is nearly “idempotent” : if there are threads waiting for the condition, a thread is randomly selected to notify; If there is no thread waiting, nothing is done and nothing is bad.
Case2: queue is full before joining

Before joining the team, you need to get the lock putLock. Check that the queue is full and wait for condition notFull. The condition notFull can be triggered either by a successful exit (necessary) or by a successful entry (also necessary to avoid the “insufficient signal” problem). After the notFull condition is met, join the team. After enqueueing, assuming that the check queue is full (same as case1 if the queue is notFull), there is no need to notify the condition notFull. Finally, check that the queue is notEmpty before enqueueing, and no notification condition notEmpty is required.

Note:

  • “Insufficient signal” problem: If the queue is full, it existsThree producers P1-P3 (more than one will do) block in 10 rows at the same time; If at this timeFive consumers c1-C5 (more than one can be used) exit the queue quickly and continuously, but only one signal is sent at the end(Lines 19-20 of the dual logic in take() will only signal if the queue is full before consumption);A signal can only wake up one producer P1, but clearly there are five elements missing from the queueNot enough to wake up P2 and P3. As a result,Lines 14-15 “notification of enqueue completion” are necessary to ensure that, as long as the queue is not full, a blocking producer can be woken up after each enqueue to wait for the lock to be released and compete for the lock. That is,After P1 finishes joining the queue, if it is found that the queue is not full, it will wake up a producer P2 randomly and let P2 compete for the lock after P1 releases the lock putLock to continue joining the queueP3. Compared to signalAll() waking up all producers, this solution allows at most one producer to be awake competing for locks at a time, resulting in a significant performance improvement.

SignalNotEmpty (), signalNotFull()

private void signalNotEmpty(a) {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally{ takeLock.unlock(); }}private void signalNotFull(a) {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally{ putLock.unlock(); }}Copy the code
Case3: Queue is empty before joining

Before joining the team, you need to get the lock putLock. If the queue is empty, the queue is joined without waiting for notFull. After joining the queue, if the queue is not full, the same as case1; If the queue is full, the same as case2. Finally, assuming that the queue is empty before checking for enqueueing (same as case1 if the queue is notEmpty), a random consumer condition notEmpty is notified.

Note:

  • The notification condition notEmpty needs to be met only when the queue is empty before joining the queue. namelyConditional Notice, is a measure to reduce invalid notification. Because if the queue is notEmpty, the dequeue operation does not block on the condition notEmpty. On the other hand, although some producers have already joined the queue, some consumers may make the queue empty after the producer releases the lock putLock and before the notification condition notEmpty is satisfied. This doesn’t matter, however, as the while loop of the take() method can be reconfirmed after the thread has contled for the lock.
  • By checking queue length (while+await) before queue entry and queue exit, it implicitly guarantees that only queue entry operations are allowed when the queue is empty and there is no competing queue.
Case4: Before joining the queue, the queue length is 1

Case4 is a special case, and the analysis method is similar to case1, but there may be a competition between joining and leaving the team, which we will analyze later.

Blocked exit operation take()

Join the team at the head of the team. TakeLock and notEmpty cooperate to complete synchronization.

public E take(a) throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}Copy the code

Put () and take() are dual and easy to analyze.

Special case when Case4 queue length is 1

If queue length is 1, is there a competition between queue entry and queue exit? This depends on the underlying data structure of the LinkedBlockingQueue.

The simplest is to use a naive LinkedList, which you can implement yourself or use non-thread-safe collection classes provided by the JDK, such as LinkedList. However, when the queue length is 1, the head and tail in the simple linked list point to the same node, so there is competition when entering and leaving the queue to update the same node.

Naive list: A node holds an element, without any controls or tricks. Typical examples are LinkedList.

Adding dummy nodes (or sentinel nodes or whatever) solves this problem. Define Node(item, next) as follows:

  • Create dummy node when initializing the linked list:
    • dummy = new Node(null, null)
    • Head = dummy.next // head is null <=> The queue is empty
    • Tail = dummy // tail.item is null <=> Queue is empty
  • When entering the ranks, the tail moves behind:
    • tail.next = new Node(newItem, null)
    • tail = tail.next
  • Dummy moves back when queue head is out, updating head synchronously:
    • oldItem = head.item
    • dummy = dummy.next
    • dummy.item = null
    • head = dummy.next
    • return oldItem

In the new data structure, updates take place on dummy and tail, and the head exists only as a hint, following the dummy node updates. If the queue length is 1, head and tail still point to the same node, but dummy and tail point to different nodes, so there is no competition when dummy and tail are updated.

Dummy head = head; dummy first = head

.public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null); }...private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;last = last.next = node; }...private E dequeue(a) {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    returnx; }...Copy the code

Order in which enqueue and count are incremented

In the case of put(), count increment must be executed later than enqueue, otherwise the while loop check of take() will fail.

In the simplest scenario, there is only one producer thread T1 and one consumer thread T2.

If count increments first and then enqueue

Assuming that the current queue length is 0, the sequence of events is as follows:

  1. T1 thread: count increases automatically
  2. T2 thread: while check count > 0 without waiting for condition notEmpty
  3. T2 thread: dequeue execution
  4. T1 thread: Enqueue execution

Obviously, after event 1 and before event 4, there are actually no elements in the queue, even though count is greater than 0. Therefore, event 3 dequeue will fail (NullPointerException is expected to be thrown). Event 4 would not have happened.

If enqueue first and count increments

This problem does not exist if enqueue is first followed by count increment.

It is still assumed that the current queue length is 0, then the sequence of events is:

  1. T1 thread: Enqueue execution
  2. T2 thread: while check count == 0 and wait for condition notEmpty
  3. T1 thread: count increases automatically
  4. T1 thread: The notification condition notFull is met
  5. T1 thread: The notification condition notEmpty is met
  6. T2 thread: received condition notEmpty
  7. T2 thread: while check count > 0 without waiting for condition notEmpty
  8. T2 thread: dequeue execution

Instead, use a state machine to describe:

  • Before event E1 occurred, the queue was inState of S1
  • Event E1 occurs, thread T1 adds a queue element, causing the number of queue elements to be greater than count (1>0), and the queue transitions toState of S2
  • The queue remains after event E1 occurs until event E3 occursState of S2
  • Event E3 occurs, thread T1 increments count, causing the number of queue elements to equal count (1=1), and the queue transitions toState of S1
  • After event E3 occurs, and until event E8 occurs, the queue remains inState of S1

Many readers are probably understanding concurrent programming from a state machine perspective for the first time, so monkey chooses to write out the state transition sequence first, and if we can understand the sequence above, we can further abstract it. The actual definition of a state machine is much more rigorous than the following, but this description is sufficient.

Now add the following definition, regardless of the difference between joining and leaving the team:

  • A state where the number of queue elements is equal to count is defined asState of S1
  • A state where the number of queue elements is greater than count is defined asState of S2
  • The enqueue operation is defined as a state transition S1->S2
  • The count increment operation is defined as a state transition S2->S1

The synchronization mechanism in LinkedBlockingQueue ensures that no other thread will see state S2, i.e., S1->S2->S1 can only be done consecutively by thread T1, and no other thread can insert state transitions in between.

In monkey’s understanding, concurrent programming is essentially a state machine, that is, the maintenance of legal states and state transitions. The above is a very simple scenario, which can be described with an example of a state machine; However, complex scenarios require mathematical proofs with state machines, which makes the use of state machines to describe concurrent programming less popular (although verbal descriptions are not strictly proof either). However, understanding the various code sequences in the implementation and the unexpected trick is just “knowing why”; Understanding the nature of the state machine through a simple example will give us an idea of how it can be thread-safe, and we can write similar implementations ourselves to “know what it is”. We will continue to analyze the source code of ConcurrentLinkedQueue using state machines.

Non-blocking methods offer() and poll()

After analyzing the two blocking methods put() and take(), the non-blocking method is simple.

The instantaneous release

Take offer, poll() for example. Let’s say the queue is not empty.

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1< capacity) notFull.signal(); }}finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}Copy the code
Case1: The queue is not full before joining the queue

Before joining the team, you need to get the lock putLock. Check that the queue is notFull (implying “no waiting condition notFull”) and join the queue directly. Once enqueued, the queue is checked for non-full and a random producer (including producers using the put() method, similarly below) is notified that the condition notFull is met. Finally, check that the queue is notEmpty before enqueueing, and no notification condition notEmpty is required.

As you can see, the transient version of offer() behaves the same as put() when the queue is not full.

Case2: queue is full before joining

Before joining the team, you need to get the lock putLock. Check that the queue is full and exit the try-block. After with case1.

When the queue is full, the difference between offer() and put() becomes apparent. Put () blocks through the while loop until the condition notFull is satisfied; Offer () returns directly.

A small point:

C is assigned -1 before applying the lock putLock. C = count.getandIncrement (); The value of c will be greater than or equal to 0 after the lock is released. Therefore, whether C is greater than or equal to 0 is directly used to judge whether the team is successful. This implementation sacrifices readability for trivial performance or code volume optimizations. Don’t write this code while you’re developing.

Timeout version

Again, take offer() as an example. Let’s say the queue is not empty.

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}Copy the code

This method is similar to put() in that lines 12-13 determine the nanos timeout (swallowing exceptions where the timeout argument is illegal), so the difference is just 14 lines: replace the blocking notfull.await () with the non-blocking timeout version notFull.awaitnanos (nanos).

The implementation of awaitNanos() is a bit interesting, but it’s not listed here. The Javadoc description in its implementation class is very succinct: “Block until Excursion, interrupted, or timed out.” returns the remaining time. The remaining time is less than or equal to the parameter nanos, indicating:

  1. Condition notFull satisfied (remaining time greater than 0)
  2. The total waiting time exceeds timeout (the remaining time is less than or equal to 0).

Nanos is first initialized to timeout; Then, the consumer thread may block, receive the signal multiple times, wake up each time it receives the signal, return the remaining time is greater than 0 and less than or equal to the parameter Nanos, and use the remaining time as the parameter Nanos for the next wait until the remaining time is less than or equal to 0. In this way, the timeout detection whose total duration does not exceed timeout is realized.

The others are the same as the put() method.

Lines 12-13 determine that the nanos argument is illegal and return false. There is an implementation problem that may violate the interface declaration.

According to the Javadoc return value declaration, the return value true means joining the queue successfully and false means joining the queue failed. But if the timeout passed in is a negative number, then the nanos initialized in line 5 will also be a negative number; As soon as we enter the while loop, false is returned at line 13. However, this is a case where the parameter is invalid, and returning false gives the impression that the parameter is normal, but failed to join the queue. This violates the interface declaration and is very difficult to detect.

You should check for invalid arguments in the function header and throw IllegalArgumentException accordingly.

Differences between LinkedBlockingQueue and ArrayBlockingQueue

Both LinkedBlockingQueue and ArrayBlockingQueue are used heavily on Github. Using LinkedBlockingQueue is also recommended in most cases, but knowing the similarities and differences between the two can help you choose the best solution for each optimization scenario.

Similarities:

  • Support of bounded

The difference between

  • LinkedBlockingQueue is implemented in a list at the bottom; ArrayBlockingQueue is implemented in an array at the bottom
  • LinkedBlockingQueue Supports unbounded queues with no specified size (maximum length integer.max_value); ArrayBlockingQueue must be specified and cannot be expanded
  • LinkedBlockingQueue supports lazy loading: ArrayBlockingQueue is not supported
  • ArrayBlockingQueue generates no additional objects when enqueued: LinkedBlockingQueue generates Node objects, which is time-consuming and stressful for GC
  • LinkedBlockingQueue is protected by two locks. There is no competition and the two locks do not affect each other. ArrayBlockingQueue both join and exit teams share a lock, and both join and exit teams are in a race, with one side going faster and the other going slower. ArrayBlockingQueue has lower concurrency performance than LinkedBlockingQueue, regardless of allocation objects, GC, and so on

As you can see, LinkedBlockingQueue is superior overall to ArrayBlockingQueue. Therefore, unless for some specific reason, use LinkedBlockingQueue in preference.

May not be complete, welcome to comment, add at any time.

conclusion

No.


This paper links: source | concurrent BlockingQueue the author of the flower: monkey 007 reference: Monkeysayhi.github. IO This article is published under the Creative Commons Attribution – Share Alike 4.0 International License. You are welcome to republish, reproduce, or use it for commercial purposes.