preface

The producer/consumer model is a very common distributed resource scheduling model. In this model, there are at least two objects: the producer and the consumer. Producers are only responsible for creating resources, and consumers are only responsible for using resources. It would be easy to implement a simple producer/consumer model on your own, just through a queue, but this approach has many hidden pitfalls:

  1. You need to ensure thread-visibility of resources and manually implement thread synchronization
  2. Various critical cases and rejection strategies need to be considered
  3. A balance needs to be struck between throughput and thread safety

So now that Java has wrapped the interface and implementation for us ahead of time, let’s take a brief look at the BlockingQueue interface and its common implementation class, LinkedBlockingQueue

Blocking queue

concept

BlockingQueue stands for BlockingQueue. As we can see from the class definition, it inherits the Queue interface and can therefore be used as a Queue:

Since it is called a blocking queue, it means that the queue operations are performed in blocking mode, which is reflected in the following two aspects:

  • Insert elements are blocked: when the queue is full, the thread that performed the insert is blocked
  • Blocked when removing an element: When the queue is empty, the thread performing the removal is blocked

In this way, the relationship between producers and consumers can be easily coordinated

Interface methods

In BlockingQueue, the following six interfaces are defined:

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    E take(a) throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

    int remainingCapacity(a);

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}
Copy the code

These interface methods can be divided into three categories by function:

  • Add elements: include Add, Offer, and PUT
  • Remove elements: include remove, poll, take, and drainTo
  • Get/check elements: includes contains, remainingCapacity

In general, we also call adding elements a PUT operation (even if we use the offer method instead of the PUT method) and removing elements a take operation

The first two categories can be divided into the following categories in terms of exception handling:

  • Throw exceptions: add and remove
  • Returns special values: offer(e), poll
  • Block: PUT (e), take
  • Timeout exit: offer(e, time, unit), poll(time, unit)

I won’t go into any of these, but the literal meaning is obvious

Implementation of blocking queues

JDK8 provides the following BlockQueue implementation classes:

We commonly use the following basic types:

  • ArrayBlockingQueue: A bounded blocking queue based on an ArrayList implementation
  • LinkedBlockingQueue: A bounded blocking queue based on the LinkedList implementation
  • PriorityBlockingQueue: Priority queue, unbounded
  • DelayQueue: An unbounded priority queue that supports delayed fetching of elements

For those of you who are interested in the rest of the implementation, let’s use LinkedBlockingQueue as an example to explain how Java implements blocking queues

Interface methods

In addition to the interface methods provided by BlockingQueue, LinkedBlockingQueue also provides a method peek to get the first queue node

At this point, the common blocking queue methods have been explained. Here is a table to summarize them [1] :

Method/processing mode An exception is thrown Return special value blocking Timeout exit
Insert elements add(e) offer(e) put(e) offer(e, timeout, unit)
Remove elements remove() poll() take() poll(timeout, unit)
Access to elements element() peek() / /

The Element method and peek method have the same function

attribute

BlockingQueue only defines the interface specification. The actual implementation is done by the concrete implementation class. Let’s skip AbstractQueue for a moment and go straight to LinkedBlockingQueue, which defines several important domain objects:

    /** Number of elements */
    private final AtomicInteger count = new AtomicInteger();
    
    /** queue first node */
    transient Node<E> head;
    /** the end of the queue */
    private transient Node<E> last;

    /** Locks held by methods such as take, poll, etc
    private final ReentrantLock takeLock = new ReentrantLock();
    /** wait queue for the take method */
    private final Condition notEmpty = takeLock.newCondition();

    /** Locks held by methods such as put and offer, which are called put locks or in-locks */
    private final ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for the put method */
    private final Condition notFull = putLock.newCondition();
Copy the code

Like LinkedList, we focus on the next four domain objects, which fall into two categories: those for inserting elements, and those for removing elements. Each of these classes has two attributes: ReentranLock and Condition. Among them, ReentranLock is a reentrant lock based on AQS[2] (those who do not understand the concept of reentrant can be regarded as ordinary locks), and Condition is a concrete implementation of wait/notification mode (which can be understood as a class providing more powerful wait and notify).

The count attribute goes without saying, but head and last are obviously used to maintain queues that store elements. Blocking queues are distinguished from normal queues by the four attributes of the ReentrantLock and Condition types, the meaning of which will be further analyzed in the next few modules

But let’s take a quick look at the Condition class for the sake of our discussion. In fact, Condition is an interface, and the implementation class is in AQS. For the purposes of this article, you only need to know three methods: await(), signal(), and singalAll(). Wait (), notify(), and notifyAll() are similar to wait(), notify(), and notify(). They manage object locks and class locks, and manipulate queues of threads waiting for these locks. Await /signal methods manage AQS based locks and manipulate AQS thread wait queues

So notEmpty here maintains a queue of threads waiting to take locks, and notFull maintains a queue of threads waiting to put locks. NotEmpty means “the queue is notFull,” so you can take elements, and notFull means “the queue is notFull,” so you can insert elements

Insert elements

offer(e)

Offer (e);

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // If the capacity reaches the upper limit, false is returned
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        // Get the put lock
        putLock.lock();
        try {
            if (count.get() < capacity) {
                // Join the team and increment the number of elements
                enqueue(node);
                // Note that c returns the value before increment
                c = count.getAndIncrement();
                // If the capacity reaches the upper limit, a PUT operation is invoked
                if (c + 1< capacity) notFull.signal(); }}finally {
            / / unlock
            putLock.unlock();
        }
        if (c == 0)
            // If the queue is previously empty, a take operation is invoked
            signalNotEmpty();
        return c >= 0;
    }
Copy the code

The offer method returns false to the user when adding elements is not allowed, similar to non-blocking communication. The thread-safety of the Offer method is guaranteed by a put lock

Now, one interesting thing here is that we’re going to look at the end and say if c == 0, then we’re going to wake up a take operation. C = count.getandIncrement () c = count getAndIncrement() C = count.getandIncrement () That is, if c == 0, then the return value of this statement is 0, i.e., the queue is empty before the element is inserted. So, if the queue is initially empty, a take operation will be invoked immediately after the first element is inserted. [3]

So far, the whole method process can be summarized as follows:

  1. To obtainPut the lock
  2. Elements join the team and incrementcountvalue
  3. If the capacity does not reach the upper limit, wake up oneputoperation
  4. If the queue is empty before the element is inserted, one is woken up at the endtakeoperation

offer(e, timeout, unit)

To strike while the iron is hot, let’s move on to the offer method with a timeout mechanism:

    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // The put lock can be acquired interruptively
        putLock.lockInterruptibly();
        try {
            // Execute the body of the while loop repeatedly until the queue is empty or the timeout is reached
            while (count.get() == capacity) {
                // Return false after the timeout period
                if (nanos <= 0)
                    return false;
                // Add the current thread to the notFull wait queue,
                // Returns the remaining available wait time
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
Copy the code

The whole method is basically the same as offer(e) method, with two differences:

  1. The lock is acquired in interruptible form, i.eputLock.lockInterruptibly()
  2. If the queue is always full, a loop is executednotFull.awaitNanos(nanos)Action to add the current thread tonotFullWaiting in the queue (waitingputOperation execution)

The rest is exactly the same as offer(e) and will not be repeated here

add(e)

The add method, in contrast to the offer method, throws an exception instead of returning a special value when the operation is not allowed, as follows:

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
Copy the code

It’s just rewrapping the offer(e), so there’s nothing to talk about, but it’s actually implemented in AbstractQueue, right

put(e)

The put(e) method blocks a thread when an operation is not allowed. Let’s see how this works:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        
        // Get the PUT lock in interruptible form
        putLock.lockInterruptibly();
        try {
            // Compare offer(e, timeout, unit) with infinite wait
            while (count.get() == capacity) {
                // A thread in the notFull queue is woken up by signal when the element removal operation is performed
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
Copy the code

The put(e) operation is similar to the offer(e, timeout, unit) operation. The only difference is that when the queue is full, the await operation no longer has a timeout. You can only wait for the take operation [4] to call the signal method to wake up the thread

Remove elements

poll()

The poll() method is used to remove and return the first node of the queue.

    public E poll(a) {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        // Get the take lock
        takeLock.lock();
        try {
            if (count.get() > 0) {
                // get out of the queue
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    // As long as there are elements in the queue, a take operation is invokednotEmpty.signal(); }}finally {
            takeLock.unlock();
        }
        // If an element is removed while the queue is full, a PUT operation is invoked
        if (c == capacity)
            signalNotFull();
        return x;
    }
Copy the code

If you really look at the offer(e) method, the poll() method is nothing to talk about, it’s just a copy of the offer(e) method (I want to talk about something, but the poll() method is exactly the same as the offer(e) process…).

other

Poll (timeout, unit)/ poll(timeout, unit)/take()/remove() methods offer(e, timeout, unit)/put()/add() methods

Access to elements

peek()

The peek() method is used to get the header element, which is implemented as follows:

    public E peek(a) {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        // Get the take lock
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally{ takeLock.unlock(); }}Copy the code

There is nothing to be said for the process, but note that this method requires the take lock, which means that the element removal operation cannot be performed during peek() execution

element()

An implementation of the element() method is in AbstractQueue:

    public E element(a) {
        E x = peek();
        if(x ! =null)
            return x;
        else
            throw new NoSuchElementException();
    }
Copy the code

It’s the same secondary encapsulation operation

conclusion

Instead of saying BlockingQueue, I said LinkedBlockingQueue. However, as a classic implementation of blocking queues, the idea of how methods are implemented in LinkedBlockingQueue is also important to understand blocking queues. To understand the concept of blocking queues, it is important to understand the concept of locks, such as LinkedBlockingQueue that is thread-safe through producer/PUT locks and consumer /take locks, as well as the Condition object corresponding to the lock. Understanding this leads to understanding of the entire producer/consumer model


  1. See the Art of Concurrent Programming in Java ↩︎

  2. See AQS at ↩︎

  3. The description “wake up a take operation” is somewhat inaccurate. It should actually be “wake up a thread waiting for a take lock”, but I think the former is more helpful for readers to understand, so I use the former description ↩︎

  4. It refers to a group of methods similar to take, including take/poll/remove. The put operation is the same as ↩︎