Queues and blocking queues

The queue

A Queue is a collection that is often used. A Queue actually implements a FIFO (First In First Out) ordered list. Like List and Set, they inherit from Collection. It differs from a List in that a List can add and remove elements anywhere, whereas a Queue has only two operations:

  • Add the element to the end of the queue;
  • Retrieves the element from the queue head.

A supermarket checkout counter is a queue:

Our common LinkedList can be used as a queue, implementing the Dequeue interface, and ConcurrentLinkedQueue, both of which are non-blocking queues.

Blocking queue

A blocking queue, as its name implies, is first and foremost a queue, and a blocking queue plays roughly the following role in a data structure

Thread 1 adds elements to the blocking queue, while thread 2 removes elements from the blocking queue

  • When the blocking queue is empty, fetching elements from the queue will be blocked.

  • When the blocking queue is full, adding elements from the queue will be blocked.

Trying to access elements from empty blocking queue thread will be blocked, until the other threads to empty the queue, insert a new element in the same way, trying to add new element is full of blocking queue of threads may also be blocked, until the other threads or removed from the list of one or more elements to add after completely empty queue.

When haidilao is full, the blocking queue is equivalent to the dining area. If the dining area is full, they will block and wait in the waiting area. If they can have dinner, they will put a wave to have dinner and take it out after eating.

Why use blocking queues and what are the benefits

In the multi-threaded world: the so-called blocking refers to the suspension of the thread under certain circumstances (i.e., blocking), once the condition is met, the suspended thread will automatically wake up.

So why do you need BlockingQueue

The advantage is that we don’t need to care when we need to block or wake up threads, because blockingQueues do everything.

Prior to the Concurrent package, in a multi-threaded environment, each of us programmers had to implement these details on our own, especially with regard to efficiency and thread safety, which added considerable complexity to our programs. Now with blocking queues, we switch from manual to automatic.

Blocking queues in Java

In addition to the familiar List and Set subclasses of Collection, there is also a Queue. BlockingQueue is derived from Queue.

BlockingQueue is an interface that requires one of its implementations to use the BlockingQueue, java.util.Concurrent implementation class with the following BlockingQueue interface:

The JDK provides seven blocking queues. , respectively,

  • ArrayBlockingQueue: A bounded blocking queue composed of array structures
  • LinkedBlockingQueue: A bounded blocking queue consisting of a linked list structure
  • PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting
  • DelayQueue: an unbounded blocking queue implemented using a priority queue
  • SynchronousQueue: A blocking queue that does not store elements
  • LinkedTransferQueue: an unbounded BlockingQueue consisting of a linked list structure (implementing TransferQueue inherited from BlockingQueue)
  • LinkedBlockingDeque: A bidirectional blocking queue consisting of a linked list structure

BlockingQueue core method

Compared to the Queue interface, BlockingQueue has four forms of API.

Method type An exception is thrown Return special value Has been blocked Timeout exit
insert add(e) offer(e) put(e) offer(e,time,unit)
Remove (take out) remove() poll() take() poll(time,unit)
check element() peek() Do not use Do not use

Take ArrayBlockingQueue as an example to look at the common methods provided by Java blocking queues

  • Throw an exception:

    • When the blocking queue is full, adding an add element to the queue is thrownjava.lang.IllegalStateException: Queue fullThe exception;
    • Thrown when remove removes elements from the queue when the queue is emptyNoSuchElementExceptionThe exception.
    • Element (), returns the element at the head of the queue, or throws one if the queue is emptyNoSuchElementExceptionabnormal

  • Return special value:

    • Offer (), an insert method that returns true on success and false on failure;
    • Poll (), which removes elements from the queue on success, or null if none are in the queue
    • Peek (), returns the element in the head of the queue, or null if the queue is empty

  • Always blocked:

    • When the blocking queue is full, if the production thread continues to put elements into the queue, the queue will block the production thread until it retrieves data or exits in response to an interrupt.
    • When the blocking queue is empty, the consuming thread tries to take elements from the queue, and the queue blocks the consuming thread until the queue is available.

  • Timeout exit:

    • When the blocking queue is full, the queue blocks the production thread for a certain amount of time. If it is longer than a certain amount of time, the production thread exits, returning false
    • When the blocking queue is empty, the queue blocks the consuming thread for a certain amount of time. If the consuming thread exits for a certain amount of time, returning NULL

BlockingQueue implementation class

Analyze the 7 blocking queues one by one, and explore the source code of several commonly used ones.

ArrayBlockingQueue

ArrayBlockingQueue, a bounded blocking queue implemented by arrays. The queue uses a first-in, first-out (FIFO) principle to sort the elements.

ArrayBlockingQueue is bounded and fixed, the size of which is determined by the constructor at construction time and cannot be changed after validation.

ArrayBlockingQueue supports an alternative fairness policy that sorts waiting producer and consumer threads, but does not guarantee fair access by default, and can be constructed with a fairness policy (fair = true). Fairness generally reduces throughput, but reduces variability and avoids “imbalance.” (Blocking queues within ArrayBlockingQueue are implemented through ReentrantLock and Condition queues, so there is a distinction between fair and unfair access to elements in ArrayBlockingQueue.)

Fair access queue refers to the threads blocked all the producers or consumers thread, when the queue is available, can according to the order of blocking access to the queue, namely the first block of the producer thread, can be inserted into the queue first element, threads blocked consumers first, to access elements from the queue, can ensure first in first out, to avoid hunger.

The source code interpretation

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // Queue through array
    final Object[] items;
    // Record the index of the first element in the queue
    int takeIndex;
    // Record the index of the element at the end of the queue
    int putIndex;
    // The number of elements in the queue
    int count;
    // Synchronize with ReentrantLock
    final ReentrantLock lock;
    // There are two conditional objects representing the condition that the queue is not empty and the queue is not full
    private final Condition notEmpty;
    private final Condition notFull;
    / / the iterator
    transient Itrs itrs;

    The offer method is used to add data to the queue
    public boolean offer(E e) {
        // The added data does not support null values
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // Synchronize through reentrant lock
        lock.lock();
        try {
          // Return false if the queue is full and will not block the thread calling the offer method
            if (count == items.length)
                return false;
            else {
               // If the queue is not full, the enqueue method is called to add elements to the queue
                enqueue(e);
                return true; }}finally{ lock.unlock(); }}// Add an offer method for waiting time
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // Get an interruptible lock
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                // Wait for the set time
                nanos = notFull.awaitNanos(nanos);
            }
           // The enqueue method is called to add elements to the queue if space is available
            enqueue(e);
            return true;
        } finally{ lock.unlock(); }}// How to add data to the queue
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
       // A queue implemented through a loop array whose subscript becomes 0 when the array is full
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
       // Activate threads that are blocked because of notEmpty conditions, such as the thread calling the take method
        notEmpty.signal();
    }

    // A method to fetch data from the queue
    private E dequeue(a) {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] ! = null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        // Set the array subscript position to NULL to free the resource
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if(itrs ! =null)
            itrs.elementDequeued();
       // Activate the thread that is blocked because of the notFull condition, such as the thread calling the PUT method
        notFull.signal();
        return x;
    }

    // The put method differs from the offer method in that if the queue is full, it blocks the thread calling the put method until there is space in the queue
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
       // Because the await() method of the condition variable is called, the await() method will throw InterruptedException and exit after the interrupt flag is set,
      If InterruptedException is set, the lock is not acquired. If InterruptedException is set, the lock is not acquired
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                // If the queue is full, it blocks and waits until the notFull signal method is called, i.e. the queue is empty
                notFull.await();
           // The queue has space to perform the add operation
            enqueue(e);
        } finally{ lock.unlock(); }}The poll method is used to fetch data from the queue without blocking the current thread
    public E poll(a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // Return null if the queue is empty, otherwise call dequeue to fetch data
            return (count == 0)?null : dequeue();
        } finally{ lock.unlock(); }}// Poll overloaded method with wait time
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally{ lock.unlock(); }}The take method is also used to fetch data from the queue, but unlike the poll method it may block the current thread
    public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // When the queue is empty, the current thread is blocked
            while (count == 0)
                notEmpty.await();
            // Until there is data in the queue, call dequeue to return the data
            return dequeue();
        } finally{ lock.unlock(); }}// return the first element of the queue
    public E peek(a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally{ lock.unlock(); }}// The number of elements in the queue is locked, so the result is accurate
    public int size(a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally{ lock.unlock(); }}// In addition, there are other methods

    // Return the remaining space in the queue, plus a few more elements
    public int remainingCapacity(a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return items.length - count;
        } finally{ lock.unlock(); }}// Check whether the current element o exists in the queue
		public boolean contains(Object o){}
    
    // Returns an array containing all the elements in the queue, in the correct order
		public Object[] toArray(){}
		
		// Automatically empties all elements in the queue
		public void clear(a){}
		
		// Remove all available elements from the queue and add them to the given Collection
		public int drainTo(Collection<? super E> c){}
		
		// Returns an iterator containing all elements in this queue iterated in the correct order
		public Iterator<E> iterator(a)
}
Copy the code

LinkedBlockingQueue

LinkedBlockingQueue is a bounded blocking queue implemented with a one-way linked list. The default and maximum length of this queue is integer.max_value. This queue sorts elements on a first-in, first-out basis.

New LinkedBlockingQueue(Capacity) New LinkedBlockingQueue(Capacity)

, to prevent excessive expansion.

The source code interpretation

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;

    // Based on the list implementation, must have node class, typical single list structure
    static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}/ / capacity
    private final int capacity;

    // Number of current queue elements
    private final AtomicInteger count = new AtomicInteger();

    // Head node, no data stored
    transient Node<E> head;

 		// End node, easy to join the queue
    private transient Node<E> last;

    // take lock, poll lock, only take, poll method will hold
    private final ReentrantLock takeLock = new ReentrantLock();

    // queue waiting conditions
		// When there is no element in the queue, the take lock blocks on the notEmpty condition, waiting for another thread to wake up
    private final Condition notEmpty = takeLock.newCondition();

    // Join lock, only put, offer will hold
    private final ReentrantLock putLock = new ReentrantLock();

    // Queue waiting conditions
	  // When the queue is full, the PUT lock blocks on notFull, waiting for another thread to wake up
    private final Condition notFull = putLock.newCondition();

    // Three constructors are also provided
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
       // Initialize the head and last Pointers to null nodes
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    
    public LinkedBlockingQueue(a) {
        // If no capacity is passed, the maximum int value is used to initialize its capacity
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(Collection<? extends E> c) {}
    
    / / team
    public void put(E e) throws InterruptedException {
        // Null elements are not allowed
        if (e == null) throw new NullPointerException();
        // specify that a local variable is reserved for the current PUT method
        int c = -1;
        // Create a node
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // use put lock to lock
        putLock.lockInterruptibly();
        try {
					// If the queue is full, the notFull condition is blocked
        	// Wait to be woken up by another thread
            while (count.get() == capacity) {
                notFull.await();
            }
            // When the queue is full, join the queue
            enqueue(node);
            // The queue length is increased by 1
            c = count.getAndIncrement();
            // If the current queue length is smaller than the capacity
        		// Wake up another thread blocking on the notFull condition
            // Why do you want to wake up?
            // There may be many threads blocking on the notFull condition
            NotFull is invoked only if the queue is full before fetching elements
            // Why does notFull wake up when the queue is full?
            // Wake up requires putLock to reduce the number of locks
            // The thread on notFull will wake up before the element is finished
            // To put it bluntly, this is also the cost of lock separation
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
			/ / releases the lock
            putLock.unlock();
        }
        // If the queue length was 0, the notEmpty condition is now invoked immediately after the addition of an element
        if (c == 0)
            signalNotEmpty();
    }
    
    private void signalNotEmpty(a) {
        final ReentrantLock takeLock = this.takeLock;
        / / and take the lock
        takeLock.lock();
        try {
            // Wake up the notEmpty condition
            notEmpty.signal();
        } finally{ takeLock.unlock(); }}private void signalNotFull(a) {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally{ putLock.unlock(); }}private void enqueue(Node<E> node) {
        // Add it directly after last
        last = last.next = node;
    }

    public boolean offer(E e) {
		// Specify the expiration date
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        // Convert to nanoseconds
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // The queue lock is not supported
        putLock.lockInterruptibly();
        try {
            // The queue is full
            while (count.get() == capacity) {
                if (nanos <= 0)
                    // Wait time out
                    return false;
                // To wait, awaitNanos(long nanos) is the method in AQS
                // If you wake up or time out while waiting, the current loop continues
                // Throw an interrupt exception if it is interrupted
                nanos = notFull.awaitNanos(nanos);
            }
            // go to the end of the queue
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            // Indicates that an additional element can be inserted after the current element
            // Wake up a blocked thread in the conditional queue
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // If the number of nodes is 0, the queue is empty
        if (c == 0)
            // Wake up a thread that is blocked by an outqueued conditional queue
            signalNotEmpty();
        return true;
    }

    public E take(a) throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // If the queue has no elements, it blocks on the notEmpty condition
            while (count.get() == 0) {
                notEmpty.await();
            }
            // Otherwise, get out of line
            x = dequeue();
            // Get the queue length before the queue
            c = count.getAndDecrement();
            // Wake up notEmpty if the queue length is greater than 1
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // If the queue length is equal to the capacity
    	 // Wake up notFull
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    private E dequeue(a) {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                // The queue is empty and has timed out
                if (nanos <= 0)
                    return null;
                // May wake up, time out, interrupt while waiting
                nanos = notEmpty.awaitNanos(nanos);
            }
            // Perform queue exit operation
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        If the queue is full before the queue exits, a thread blocked by take() is awakened
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E poll(a) {
		//
    }

    public E peek(a) {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally{ takeLock.unlock(); }}void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for(Node<E> trail = head, p = trail.next; p ! =null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true; }}return false;
        } finally{ fullyUnlock(); }}public boolean contains(Object o) {}static final class LBQSpliterator<E> implements Spliterator<E> {}}Copy the code

LinkedBlockingQueue compared to ArrayBlockingQueue

  • ArrayBlockingQueue Uses a lock to block incoming and outgoing queues, which is inefficient.
  • LinkedBlockingQueue Two locks are used to join and leave the team, which does not interfere with each other, high efficiency;
  • Both are bounded queues. If the queue length is the same and the queue exit speed cannot keep up with the queue entry speed, a large number of threads will be blocked.
  • LinkedBlockingQueue If initialization does not pass in the initial capacity, the maximum int value is used. If the queue is not queued as fast as the queue is queued, the queue will be very long and consume a lot of memory.

PriorityBlockingQueue

PriorityBlockingQueue is an unbounded blocking queue that supports a priority. (Although it is an unbounded queue, it will OutOfMemoryError and cannot add elements due to resource exhaustion.)

By default, elements are arranged in natural ascending order. You can also customize the class to implement the compareTo() method to specify the element collation rules, or when initializing PriorityBlockingQueue, specify the constructor parameter Comparator to sort the elements. However, it is important to note that the order of elements of the same priority cannot be guaranteed. PriorityBlockingQueue is implemented based on the minimum binary heap. It uses a spin lock based on THE CAS implementation to control the dynamic queue expansion and ensure that the expansion operation does not block the execution of the take operation.

DelayQueue

DelayQueue is a delayed unbounded blocking queue implemented using a priority queue.

The queue is implemented using PriorityQueue. The elements in the queue must implement the Delayed interface, which specifies when the element is created how long it will take to retrieve the current element from the queue. Elements can only be extracted from the queue when the delay expires. DelayQueue can be used in the following scenarios:

  • The design of the cache system: we can use the DelayQueue to store the expiration date of cached elements, and use a thread to query the DelayQueue. Once the element can be retrieved from the DelayQueue, the expiration date of the cache is up.
  • Scheduled task scheduling. DelayQueue is used to store the tasks that will be executed on the day and the execution time. Once the tasks are obtained from DelayQueue, they will be executed. For example, Timer is implemented using DelayQueue.

SynchronousQueue

SynchronousQueue is a blocking queue that stores no elements, that is, a queue of individual elements.

Each PUT operation must wait for a take operation or it cannot continue to add elements. SynchronousQueue can be thought of as a passer that passes data processed by the producer thread directly to the consumer thread. Queues themselves do not store any elements and are ideal for transitive scenarios, such as data used in one thread being passed to another thread, where SynchronousQueue has a higher throughput than LinkedBlockingQueue and ArrayBlockingQueue.

Coding

SynchronousQueue is a blocking queue with no data buffers on which the producer thread’s insert put() must wait for the consumer’s remove take(), and vice versa.

Corresponding to peek, contains, clear, isEmpty… And so on are actually ineffective.

However, poll() and offer() do not block. For example, the poll() and offer() will immediately satisfy true if there is a consumer waiting on the offer, and will return false if there is no consumer waiting on the offer.

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new SynchronousQueue<>();
				
      	//System.out.println(queue.offer("aaa")); //false
        //System.out.println(queue.poll()); //null

        System.out.println(queue.add("bbb"));      //IllegalStateException: Queue full
      
        new Thread(()->{
            try {
                System.out.println("Thread 1 put a");
                queue.put("a");

                System.out.println("Thread 1 put b");
                queue.put("b");

                System.out.println("Thread 1 put c");
                queue.put("c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2 get:"+queue.take());

                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2 get:"+queue.take());

                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2 get:"+queue.take());
            } catch(InterruptedException e) { e.printStackTrace(); } }).start(); }}Copy the code
Thread 1 put a
Thread 2 get:a
Thread 1 put b
Thread 2 get:b
Thread 1 put c
Thread 2 get:c
Copy the code

The source code interpretation

Unlike blocking queues such as ArrayBlockingQueue and LinkedBlockingDeque, which rely on AQS for concurrent operations, SynchronousQueue uses CAS directly for secure access to threads.

SynchronousQueue provides two constructors (fairly or not) that are internally implemented by transferers, namely TransferStack and TransferQueue.

TransferStack: The data structure used in unfair competition mode is LIFO Stack.

TransferQueue: FIFO Queue is used in fair competition mode.

In terms of performance, the two are comparable. In general, FIFO can support higher throughput, but LIFO can keep threads localized to a greater extent.

private transient volatile Transferer<E> transferer;

public SynchronousQueue(a) {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
Copy the code

Analyze the implementation of the TransferQueue

// The constructor initializes a node that is out of the queue and points both to that node
TransferQueue() {
    QNode h = new QNode(null.false); // initialize to dummy node.
    head = h;
    tail = h;
}
// Queue node,
static final class QNode {
  volatile QNode next;          // next node in queue
  volatile Object item;         // CAS'ed to or from null
  volatile Thread waiter;       // to control park/unpark
  final boolean isData;

  QNode(Object item, boolean isData) {
    this.item = item;
    this.isData = isData;
  }
	// Set the next and item values for concurrent updates. Cas is unlocked
  boolean casNext(QNode cmp, QNode val) {
    return next == cmp &&
      UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  }

  boolean casItem(Object cmp, Object val) {
    return item == cmp &&
      UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  }

  void tryCancel(Object cmp) {
    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
  }

  boolean isCancelled(a) {
    return item == this;
  }

  boolean isOffList(a) {
    return next == this;
  }

  // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  private static final long itemOffset;
  private static final long nextOffset;

  static {
    try{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item"));
      nextOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("next"));
    } catch (Exception e) {
      throw newError(e); }}}Copy the code

From the put() method and take() method, it can be seen that the transfer() method of TransferQueue is finally called.

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false.0) = =null) {
        Thread.interrupted();
        throw newInterruptedException(); }}public E take(a) throws InterruptedException {
  E e = transferer.transfer(null.false.0);
  if(e ! =null)
    return e;
  Thread.interrupted();
  throw new InterruptedException();
}

The transfer method is used to submit or retrieve data
E transfer(E e, boolean timed, long nanos) {

  QNode s = null; // constructed/reused as needed
  // If e is not null, it indicates that data is added to the queue
  booleanisData = (e ! =null);

  for (;;) {
    QNode t = tail;
    QNode h = head;
    if (t == null || h == null)         // saw uninitialized value
      continue;                       // spin

    // If the current operation is the same as the tail node operation; Or the same head and tail (indicating that there is nothing in the queue).
    if (h == t || t.isData == isData) { // empty or same-mode
      QNode tn = t.next;
      // if t and tail are different, tail is changed by another thread
      if(t ! = tail)// inconsistent read
        continue;
      // if tail's next is not empty. You need to append next to tail
      if(tn ! =null) {               // lagging tail
        // use CAS to change tail.next to tail,
        advanceTail(t, tn);
        continue;
      }
      // Return null, insert failed, fetch failed
      if (timed && nanos <= 0)        // can't wait
        return null;
      if (s == null)
        s = new QNode(e, isData);
      if(! t.casNext(null, s))        // failed to link in
        continue;

      advanceTail(t, s);              // swing tail and wait
      Object x = awaitFulfill(s, e, timed, nanos);
      if (x == s) {                   // wait was cancelled
        clean(t, s);
        return null;
      }

      if(! s.isOffList()) {// not already unlinked
        advanceHead(t, s);          // unlink if head
        if(x ! =null)              // and forget fields
          s.item = s;
        s.waiter = null;
      }
      return(x ! =null)? (E)x : e; }else {                            // complementary-mode
      QNode m = h.next;               // node to fulfill
      if(t ! = tail || m ==null|| h ! = head)continue;                   // inconsistent read

      Object x = m.item;
      if(isData == (x ! =null) | |// m already fulfilled
          x == m ||                   // m cancelled! m.casItem(x, e)) {// lost CAS
        advanceHead(h, m);          // dequeue and retry
        continue;
      }

      advanceHead(h, m);              // successfully fulfilled
      LockSupport.unpark(m.waiter);
      return(x ! =null)? (E)x : e; }}}Copy the code

LinkedTransferQueue

LinkedTransferQueue is an unbounded blocking TransferQueue queue consisting of a linked list structure.

The LinkedTransferQueue follows a preoccupation pattern. Took the element means consumer threads, if the queue is not empty, directly take the data, if the queue is empty, it will generate a squad of node (the node element is null), then the consumer thread is waiting on this node, the producer thread when the team discovered that a element is null node, producer thread won’t be the team, The element is filled directly into the node, and the waiting thread for the node is awakened. The awakened consumer thread picks up the element and returns it from the invoked method. We call this node operation the “match” approach.

The queue implements the TransferQueue interface and overrides the tryTransfer and Transfer methods, which provide matching functionality similar to the SynchronousQueue equal-mode queue

LinkedBlockingDeque

LinkedBlockingDeque is a two-way blocking queue consisting of a linked list structure.

A two-way queue means that you can insert and remove elements from both ends of the queue. Double-endian queues have one more entry to the operation queue, reducing the contention in half when multiple threads join the queue at the same time. LinkedBlockingDeque has more methods than other blocking queues: addFirst, addLast, offerFirst, offerLast, peekFirst, peekLast, etc. Gets (PEEK) or removes the first element of a two-ended queue. A method ending in the word Last that inserts, retrieves, or removes the Last element of a double-ended queue. In addition, the insert method add is equivalent to addLast and the remove method remove is equivalent to removeFirst.

When initializing LinkedBlockingDeque, you can set the size to prevent excessive swelling, and the default size is integer.max_value. In addition, two-way blocking queues can be used in “job stealing” mode.

Blocking queue usage scenarios

The common producer-consumer model can be implemented based on blocking queues;

When the number of active threads in the thread pool reaches corePoolSize, the thread pool submits subsequent tasks to BlockingQueue.

Producer-consumer model

The BlockingQueue in the JDK API documentation shows a typical application

Interview question: a variable with an initial value of 0, two threads aligned alternately, one +1, one -1, 5 rounds

public class ProdCounsume_TraditionDemo {

    public static void main(String[] args) {

        ShareData shareData = new ShareData();

        new Thread(() -> {
            for (int i = 0; i <= 5; i++) { shareData.increment(); }},"T1").start();

        new Thread(() -> {
            for (int i = 0; i <= 5; i++) { shareData.decrement(); }},"T1").start(); }}// Threads manipulate resource classes
class ShareData {
    private int num = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment(a) {
        lock.lock();
        try {
            while(num ! =0) {

                // Wait, cannot produce
                condition.await();
            }
                / / work
                num++;
                System.out.println(Thread.currentThread().getName() + "\t" + num);

                / / wake
                condition.signal();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public void decrement(a) {
        lock.lock();
        try {
            while (num == 0) {
                // Wait, cannot produce
                condition.await();
            }
                / / work
                num--;
                System.out.println(Thread.currentThread().getName() + "\t" + num);

                / / wake
                condition.signal();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}}Copy the code

The thread pool

The core thread pool method, ThreadPoolExecutor, uses BlockingQueue to hold a BlockingQueue of tasks that have been submitted but not yet executed

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
Copy the code

Thread pool actually builds a producer-consumer model internally, decouples threads from tasks and is not directly related, so as to buffer tasks well and reuse threads.

Different thread pool implementations use different blocking queues, newFixedThreadPool and newSingleThreadExecutor use LinkedBlockingQueue, NewCachedThreadPool uses SynchronousQueue.

The article continues to update, you can wechat search “JavaKeeper” for the first time to read, no routine to receive 500+ this e-book and 30+ video teaching and source code, GitHub github.com/JavaKeeper has been included, Javaer development, interview necessary skills weapon spectrum, There’s something you want.

Reference and thanks

www.liaoxuefeng.com/

SynchronousQueue will source juejin. Cn/post / 684490…