This article is “Java Concurrent Programming book Notes”

What is a blocking queue

A BlockingQueue is a queue that supports two additional operations. These two additional operations support blocking insert and remove methods.

  • Blocking insertion method: When the queue is full, the queue blocks the thread that inserted the element until the queue is full.
  • Blocking removal is supported: When the queue is empty, the thread fetching the element waits for the queue to become non-empty.

Blocking queues are often used in producer and consumer scenarios, where the producer is the thread that adds elements to the queue and the consumer is the thread that fetches elements from the queue. Blocking queues are containers that producers use to store elements and consumers use to get them.

When the blocking queue is not available, these two additional operations provide four ways of handling it, as shown below:

  • Throw exception: When the queue is full, an IllegalArgumentException is thrown if an element is inserted into the queue. When the queue is empty, fetching elements from the queue throws a NoSuchElementException.

  • Return special value: When an element is inserted into the queue, it returns whether the element was successfully inserted, and true on success. The remove method retrieves an element from the queue, or returns NULL if it does not.

  • Keep blocking: When the blocking queue is full, if the producer thread puts elements to the queue, the queue will keep blocking the producer thread until the queue is available or the response interrupts and exits. When the queue is empty, if the consumer takes elements from the queue, the queue blocks the consumer thread until the queue is no longer empty.

Tips: With an unbounded blocking queue, the queue can never be full, so using the PUT or offer methods never blocks, and the offer method always returns true.

Blocking queues provided by the JDK

ArrayBlockingQueue

ArrayBlockingQueue is a bounded blocking queue implemented with arrays. This queue sorts elements on a first-in, first-out basis. The construction method is as follows:

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for(E e : c) { checkNotNull(e); items[i++] = e; }}catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally{ lock.unlock(); }}Copy the code

The fair argument is used to set whether the thread has fair access to the queue. The so-called fair access refers to the blocking thread, can access the queue in the order of blocking, that is, the blocking line first access the queue first. Non-fairness is unfair to the threads that wait first. When the queue is available, the blocking threads can compete for access to the queue, and it is possible that the thread that blocks first will access the queue last. Throughput is usually reduced to ensure fairness.

LinkedBlockingQueue

LinkedBlockingQueue is a bounded blocking queue implemented as a linked list. The default and maximum length of this queue is integer.max_value. This queue sorts elements on a first-in, first-out basis.

PriorityBlockingQueue

PriorityBlockingQueue is an unbounded blocking queue that supports a priority. By default, elements are arranged in natural ascending order. You can also customize the class to implement the compareTo() method to specify the element collation rules, or when initializing PriorityBlockingQueue, specify the construction parameter Comparator to do the sorting. Note that the order of elements of the same priority cannot be guaranteed.

DelayQueue

DelayQueue is an unbounded blocking queue that supports delayed fetching of elements. The queue is implemented using PriorityBlockingQueue. The elements in the queue must implement the Delayed interface, which specifies when the element is created how long it will take to retrieve the current element from the queue. Elements can only be extracted from the queue when the delay expires.

DelayQueue is used in the following scenarios:

  • The design of the cache system: we can use the DelayQueue to store the expiration date of cached elements, and use a thread to query the DelayQueue. Once the element can be retrieved from the DelayQueue, the expiration date of the cache is up.
  • Task timeout processing: for example, if the payment is not made within 15 minutes after placing an order, the order will be automatically closed.

How to implement the Delayed interface

Elements of the DelayQueue queue must implement the Delayed interface. Can reference ScheduledThreadPoolExecutor ScheduledFutureTask class implementation, steps are as follows:

  1. In the object created for the object, initialize the basic data. Use time to record when the current object is delayed until it can be used, and sequenceNumber to identify the order in which elements are placed in the queue. The code is as follows:

    private static final AtomicLong sequencer = new AtomicLong();
    
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    Copy the code
  2. Implement the getDelay method, which returns how long the current element needs to be extended, in nanoseconds. The code is as follows:

     public long getDelay(TimeUnit unit) {
         return unit.convert(time - now(), NANOSECONDS);
     }
    Copy the code

    It can be seen from the constructor that the delay time parameter ns is in nanoseconds. It is best to use nanoseconds when designing. When implementing getDelay(), you can specify any unit. When time is less than the current time, getDelay returns a negative value.

  3. Implement the compareTo method to specify the order of elements. For example, place the longest delay at the end of the queue. The following code

    public int compareTo(Delayed other) {
           if (other == this) // compare zero if same object
               return 0;
           if (other instanceofScheduledFutureTask) { ScheduledFutureTask<? > x = (ScheduledFutureTask<? >)other;long diff = time - x.time;
               if (diff < 0)
                   return -1;
               else if (diff > 0)
                   return 1;
               else if (sequenceNumber < x.sequenceNumber)
                   return -1;
               else
                   return 1;
           }
           long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
           return (diff < 0)? -1 : (diff > 0)?1 : 0;
    }
    Copy the code

How to implement delayed blocking queue

The implementation of a delayed blocking queue is simple: when a consumer retrieves an element from the queue, it blocks the current thread if the element does not reach the delay time.

private Thread leader = null;
   
private final Condition available = lock.newCondition();

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if(leader ! =null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null&& q.peek() ! =null) available.signal(); lock.unlock(); }}Copy the code

SynchronousQueue

SynchronousQueue is a blocking queue that stores no elements. Each PUT operation must wait for a take operation or it cannot continue to add elements. Constructors are as follows:

public SynchronousQueue(a) {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
Copy the code

SynchronousQueue can be thought of as a passer that passes data processed by the producer thread directly to the consumer thread. Queues themselves do not store any elements, making them ideal for transitive scenarios. SynchronousQueue has a higher throughput than LinkedBlockingQueue and ArrayBlockingQueue.

LinkedTransferQueue

LinkedTransferQueue is an unbounded blocking TransferQueue queue consisting of a linked list structure. LinkedTransferQueue has tryTransfer and Transfer methods compared to other blocking queues.

  • Transfer method

    If there is a consumer currently waiting to receive an element (when the consumer uses a take() method or a poll() method with a time limit), the transfer method can immediately transfer an element passed in by a producer to the consumer. If no consumer is waiting to receive an element, the Transfer method stores the element at the tail node of the queue and returns it until it has been consumed.

  • TryTransfer method

    TryTransfer method is used to test whether the element passed in by the producer can be passed directly to the consumer. If no consumer is waiting to receive the element, fasLE is returned. The difference between the tryTransfer method and the Transfer method is that the method will return immediately regardless of whether the consumer receives it or not, while the transfer method must wait until the consumer consumes before returning.

LinkedBlockingDeque

LinkedBlockingDeque is a two-way blocking queue consisting of a linked list structure. A two-way queue is one in which elements can be inserted and removed from both ends of the queue. The two-way queue has one more entry to the operation queue, reducing the contention in half when multiple threads join the queue at the same time. LinkedBlockingDeque has addFirst, addLast, offerFirst, offerLast, peekFirst and peekLast methods compared to other blocking queues. Methods that end in the First word, Insert, get (PEEK), or remove the first element of a two-ended queue. A method ending in the word Last that inserts, retrieves, or removes the Last element of a two-way queue.

Blocking queue principle

If the queue is empty and the consumer waits, how does the consumer know that there are elements in the current queue when the producer adds them? The JDK uses notification mode. The notification mode blocks the primary producer when a producer adds elements to a full queue, and notifies the producer when a message consumes an element in a queue that the current queue is available.

ArrayBlockingQueue uses Condition to implement this:

private final Condition notEmpty;

private final Condition notFull;

public E take(a) throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) // Blocks the current consumer when the queue is empty
            notEmpty.await();
        return dequeue();
    } finally{ lock.unlock(); }}public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally{ lock.unlock(); }}private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
          putIndex = 0;
     count++;
     notEmpty.signal(); // If the queue is not empty, inform the messager to get the element
}
Copy the code

If you feel that you have gained something after reading it, please click “like”, “follow” and add the official account “Niumi Technology” to read more wonderful history!!