preface

The question of how to design a blocking queue in Java was asked at the beginning. At that time, I did not give a good answer. I just said that I would use a List, and then the consumer would monitor whether there was a value in the List with an endless loop. If there was a value, the content in the List would be processed. In retrospect, I was such an idiot that I would have designed a blocking queue like this (and I’m not a blocking queue anyway). As a result, I did not summarize this knowledge after the interview, and then some time later, an educational institution was asked a similar question, but changed the form of “Please use wait method and notify method to implement the logic of producers and consumers”. Then I was confused again, regretful, why I did not go to understand this part of the knowledge, so THIS time I am going to summarize this part of the content.

The specific implementation

If you implement a Queue, then a LinkedList that implements the Queue interface can be used directly, or you can write your own FIFO Array. However, in order to achieve blocking, we need to implement blocking, that is, when the queue is empty, if we continue to get data from the queue, it will be blocked until there is new data in the queue. And when the queue is full (reaching its maximum capacity), adding elements to the queue will be blocked until data is removed from the queue.

A lock is required to ensure that only one thread is enqueued and only one thread is enqueued at the same time. The wait() and notifyAll() methods are used to block and wake up the outgoing and enqueued threads.

The code implementation is as follows:

public class MyBlockQueue {

    /** * The default queue length is 10 */
    private int limit = 10;
    private Queue queue = new LinkedList<>();

    /** * Initialize the queue capacity *@paramLimit Queue capacity */
    public MyBlockQueue(int limit){
        this.limit = limit;
    }

    /** * queue *@paramObject Queue element *@throws InterruptedException
     */
    public synchronized boolean push(Object object) throws InterruptedException{
        // If the queue is full, the thread that adds to the queue blocks the wait.
        while (this.queue.size() == this.limit){
            wait();
        }
        // If the queue is empty, wake up all blocked threads.
        if(this.queue.size() == 0){
            notifyAll();
        }
        / / team
        boolean add = this.queue.offer(object);
        return add;
    }

    /** * exit queue *@return
     * @throws InterruptedException
     */
    public synchronized Object pop(a) throws InterruptedException{
        // If the queue is empty, the queue is blocked.
        while (this.queue.size() == 0){
            wait();
        }
        // Wake up all blocked threads if the queue is full again.
        if(this.queue.size() == this.limit){
            notifyAll();
        }
        Object poll = this.queue.poll();
        returnpoll; }}Copy the code

Implementation of blocking queues in Java

Let’s start by summarizing what blocking queues are already implemented in Java:

The queue describe
ArrayBlockingQueue A bounded blocking queue based on an array structure
LinkedBlockingQueue A bounded blocking queue based on linked list structure
PriorityBlockingQueue Support for unbounded blocking queues sorted by priority
DelayQueue An unbounded blocking queue based on a priority queue
SynchronousQueue A blocking queue that does not store elements
LinkedTransferQueue An unbounded blocking queue based on linked list structure
LinkedBlockingDeque A double – ended blocking queue based on linked list structure

Let’s focus this time on ArrayBlockingQueue and LinkedBlockingQueue. In introducing these two blocking queues, I first introduce two methods of ReentrantLock and Condition. Because the BLOCKING queue locks in the JDK are basically implemented through both apis.

ReentrantLock

  • Lock () : Lock operation. If there is a race, it will wait and block until the lock is acquired.
  • LockInterruptibly () : Locks the operation, but preferentially supports response interrupts.
  • TryLock () : Attempts to acquire the lock without waiting, returns true on success, false on failure.
  • TryLock (long timeout, TimeUnit Unit) : Attempts to obtain the lock. Returns true on success and false on failure.
  • Unlock () : Releases the lock.

Condition

Commonly used with ReentrantLock

  • Await () : block the current thread and release the lock.
  • Signal () : wakes up the thread that has waited the longest.

ArrayBlockingQueue

A constructor

So let’s take a look firstArrayBlockingQueueThe initialization method of ArrayBlockingQueueThere are three constructors, but they are all based onArrayBlockingQueue(int capacity, Boolean fair), so just know this constructor. Main is:

  • The array structure is used to initialize the queue and define the queue length.
  • Then create a global lock, out of the team and join the team to obtain the lock before the operation;
  • Create a non-empty wait queue for blocking threads;
  • Create a non-full wait queue for blocking threads;

In the queue

Let’s look at the enqueue operation Both the put() and offer() methods lock when enqueued, and then finally enqueue() is called, butThe put method blocks the queue, which means that if the queue is full, the queuing thread will be blocked. The offer method does not block the queued thread. The offer method does not block the queued thread, and the queued thread will return a failure.

So let’s do the enqueue() method.The enqueue() method is not complicated eitherThe entry is at the end of the array, and the exit is at the head of the queue, so that when the queue is full, the next entry should start at the head of the queue. That’s why we reset putIndex.

If you don’t understand, see the following image. When the normal queue is full, the end of the array is queued, and the head of the array is queued.When the queue is full, enqueueing starts at the head of the array.

Out of the queue

So let’s seeArrayBlockingQueueThe dequeue method of As can be seen from the above two source code screenshots, whether poll() or take(), dequeue() is called, but take() blocks the queue. When the queue is empty, it blocks the queue thread and puts it into the waiting queue.

So how does dequeue() get out of the queue?We know through the source code, out of the queue is based on the queue index takeIndex to determine which element, if the current out of the queue element index is just the array capacity of the last element, then out of the queue index takeIndex also have to start from scratch. We then update the data in the iterator and wake up the queued threads in the block.

Remove (Object O) and removeAt(final int removeIndex) are slightly more complicated because they need to locate the element to be removed, and then they need to queue. RemoveAt (final int removeIndex) is the final method of removing from the queue. After locating the element to be removed, removeAt replaces the element to be removed with the element in the takeIndex position to complete the queue operation.

LinkedBlockingQueue

A constructor

LinkedBlockingQueueThe queue’s data information is initialized in the constructor, but the core capabilities required to implement blocking queues are initialized when the JVM allocates space for objects.

In the queue

As you can see from initializing the data,LinkedBlockingQueueThere are two locks, the inbound lock in the queue and the outbound lock in the queue, which are two separate reentrant locks. In this way, the opposite processing of incoming and outgoing queues greatly improves the throughput of queues. We seeLinkedBlockingQueueThe put method blocks enqueueing, that is, when the queue is full, while the offer method does not block enqueueing, and returns true on success or false otherwise.

Both of these methods call the enqueue() method at the bottom, so let’s see how this method performs the enqueue. enqueue()The logic is simple: add the element to the end of the list.

Out of the queue

LinkedBlockingQueueTo get out of the queuetakeLock, and then executes the queue method. The take and poll methods block the eject thread when the queue is empty, while the poll method does not block the eject thread when the queue is empty and returns NULL.

Both the take and poll methods are calleddequeue()Method is executed out of the queue, so let’s seedequeue()Method implementation. Always forgot to say, I posted the source code is JDK1.8 version.We seedequeue()The first non-null node after the head node is removed from the queue and a new head node position is set.

The initial value of the head Node is NULL (new Node(NULL)), but next points to the second Node in the queue.

  • In the first step, the head node will change its next node from pointing to the second node to pointing to itself, so that the head node is null, and now next is empty, so that the node will be recycled first during GC.
  • The second step assigns the value of the next node of the original head node to the head, so that the original second node becomes the head node, and returns the data of the new head node.
  • Set the value of the new head node to null, so that the new node has the same data form as the old head node.

We can see it more clearly in the following image:Let’s take a look at another method that gets out of the queue, remove.performremove()Method to add both the ejected and enqueued locks, and wait for both operationsremove()Method is executed after the operation. In order to make sure thatremove()Method to find the specified element in the queue and out of the queue operations resulting in traversal chaos.

Let’s look at it againunlink()Method to remove the element from the list. If the element is last, do some processing.

conclusion

  • To implement a blocking queue itself, a lock is first required to ensure that the incoming and outgoing threads block the main incoming and outgoing threads when the queue is full and empty. It then wakes up the queue thread when there is space on the queue and wakes up the queue thread when there is data on the queue.
  • ArrayBlockingQueueandLinkedBlockingQueueAre bounded blocking queues (LinkedBlockingQueueIf the default length is Int, the maximum value ofArrayBlockingQueueBlocking queues are implemented through data and are dependentReentrantLockandConditionTo lock it up.LinkedBlockingQueueBlocking queues are implemented via linked lists, and are also dependentReentrantLockandConditionTo complete the lock.
  • ArrayBlockingQueueThe global unique lock is adopted. Only one operation can be performed simultaneously in the queue and in the queue.LinkedBlockingQueueThe inbound and outbound queues use opposite reentrant locks respectively, and the inbound and outbound queues can be executed separately, so the throughput ratioArrayBlockingQueueHigher.
  • ArrayBlockingQueueUsing arrays to implement queues, the execution process does not free up memory space, so more continuous memory is needed;LinkedBlockingQueueAlthough a large amount of contact memory is not required, a large number of objects can be created and emptied in concurrent cases, depending on the efficiency of the GC.