LinkedBlockingQueue (JDK1.8)

LinkedBlockingQueue is an unbounded first-in, first-out queue based on a linked list

Again, let’s start with the class diagram

As you can see from the class diagram, LinkedBlockingQueue inherits an abstract class and implements two interfaces, which are described briefly:

  • AbstractQueue: this section provides operations related to add, delete, and query
  • BlockingQueue: Allows you to add, delete, and check in more situations
  • Serializable: Enables its serialization operation

attribute

Attribute related source code


    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

Copy the code

private final int capacity; // The maximum capacity to store elements

private final AtomicInteger count = new AtomicInteger(); // The number of elements in the current blocking queue

transient Node head; // The first node of the list

private transient Node last; // The last node of the list

private final ReentrantLock takeLock = new ReentrantLock(); // The lock acquired by the thread when the element is queued

private final Condition notEmpty = takeLock.newCondition(); // Whether to make the thread wait while reading

private final ReentrantLock putLock = new ReentrantLock(); // The lock acquired by the thread when the element is enqueued

private final Condition notFull = putLock.newCondition(); // Whether to make the thread wait while adding operations

node

    static class Node<E> {
        E item;//存储自身节点的元素

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;//存储的下一个节点

        Node(E x) { item = x; }
    }
Copy the code

A constructor

public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); }}Copy the code

The default value of capacity is integer. MAX_VALUE. You can set capacity by yourself

Preliminary knowledge

AtomicInteger is an Integer class that provides atomic operations on addition and subtraction in a thread-safe manner. Provides atomic operations for use with Integer and is therefore ideal for use in high concurrency situations.

ReentrantLock is a reentrant fair and unfair lock that supports interrupt response, timeout, attempt to acquire the lock, can be associated with multiple conditional queues.

Condition is used to implement ReentrantLock’s wait and notify functions

Add and delete

The operation of entrance

Public void put(E E) throws InterruptedException {if (E == null) throw new NullPointerException(); if (E == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = - 1; // Create a new Node Node<E> Node = new Node<E>(E); Final ReentrantLock putLock = this.putLock; Final AtomicInteger count = this.count; final AtomicInteger count = this.count; // Performing interruptible lock acquisition means that if the thread is Blocked from acquiring the lock, it can be interrupted and no longer wait. This is also a way to avoid deadlocks without having to restart the application because the thread cannot be interrupted after a deadlock is found. putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can  * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of Count in other wait Guards. */ / If the element in the queue reaches its maximum capacity, the thread is in wait state, While (count.get() == capacity) {notfull.await (); } // enqueue(node); Unsafe.getandaddint (this, valueOffset, 1); // unsafe.getAndAddInt(this, valueOffset, 1); //c+1 is the number of new elements in the queue. This number is less than the maximum capacity, so it will wake up other threads waiting to join the queue. If (c + 1 < capacity) notfull.signal (); } finally {putlock. unlock(); } // if (c == 0) signalNotEmpty(); } private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; // Last. Next = node; last=node; Last = last.next = node; last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; The lock (); / / locking takeLock. Try {// Wake up the thread waiting for the element notempty.signal (); } finally {takelock.unlock (); }}Copy the code

Put () = offer() = offer() = offer(

Public Boolean offer(E E, long timeout, TimeUnit unit) throws InterruptedException { If (e == null) throw new NullPointerException(); Long nanos = ununit.tonanos (timeout); int c = -1; Final ReentrantLock putLock = this.putLock; Final AtomicInteger count = this.count; final AtomicInteger count = this.count; // Performing interruptible lock acquisition means that if the thread is Blocked from acquiring the lock, it can be interrupted and no longer wait. This is also a way to avoid deadlocks without having to restart the application because the thread cannot be interrupted after a deadlock is found. putLock.lockInterruptibly(); While (count.get() == capacity) {// If the number of elements in the queue reaches its maximum capacity, the thread waits until the queue has free capacity. If (nanos <= 0) return false; // Suspends the current Condition thread until another thread calls the signal or signalAll methods of the current Condition. If another thread interrupts the current thread, InterruptedException is thrown and the current thread continues. The current thread is falsely awakened. The specified timeout period is up. The unit is nanosecond. In all four cases, nanos = notFull. AwaitNanos (nanos); } // enqueue(new Node<E>(E)); Unsafe.getandaddint (this, valueOffset, 1); // unsafe.getAndAddInt(this, valueOffset, 1); //c+1 is the number of new elements in the queue. This number is less than the maximum capacity, so it will wake up other threads waiting to join the queue. If (c + 1 < capacity) notfull.signal (); } finally {putlock. unlock(); } // if (c == 0) signalNotEmpty(); return true; } public Boolean offer(E E) {if (E == null) throw new NullPointerException(); Final AtomicInteger count = this.count; final AtomicInteger count = this.count; If (count.get() == capacity) return false; if (count. int c = -1; // New Node<E> Node = new Node<E>(E); Final ReentrantLock putLock = this.putLock; The lock (); / / locking putLock. If (count.get() < capacity) {// Enqueue (node); // Enqueue (node); Unsafe.getandaddint (this, valueOffset, 1); // unsafe.getAndAddInt(this, valueOffset, 1); //c+1 is the number of new elements in the queue. This number is less than the maximum capacity, so it will wake up other threads waiting to join the queue. If (c + 1 < capacity) notfull.signal (); }} finally {putlock.unlock (); } // if (c == 0) signalNotEmpty(); Return c >= 0; }Copy the code

To summarize enqueueing: lock is used to lock, then the list unlocks the new element, wakes up other PUT wait threads, and releases the lock.

Remove elements

Public Boolean remove(Object o) {public Boolean remove(Object o) {if (o == null) return false; //put lock, take lock fullyLock(); For (Node<E> trail = head, p = trail. Next; p ! = null; Trail = p, p = p.ext) {// find the same element, the current node p,trail is the previous node of the current node p if (o.eos (p.tem)) {unlink(p, trail); return true; } } return false; FullyUnlock (); fullyUnlock(); } } void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, To allow iterators that are // traversing p to maintain their weak-consistency guarantee null; Trail. next = p.ext; // Trail. next = p.ext; // Trail. next = p.ext; Trail if (last == p) last = trail; trail if (last == p) last = trail; // The code for getAndDecrement is unsafe.getAndAddInt(this, valueOffset, -1), which retrieves the current number of elements and -1, equal to the maximum number, If (count.getAnddecrement () == capacity) NotFull.signal (); } void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }Copy the code

The team operation

public E take() throws InterruptedException { E x; int c = -1; Final AtomicInteger count = this.count; Final ReentrantLock takeLock = this.takeLock; // Performing interruptible lock acquisition means that if the thread is Blocked from acquiring the lock, it can be interrupted and no longer wait. This is also a way to avoid deadlocks without having to restart the application because the thread cannot be interrupted after a deadlock is found. takeLock.lockInterruptibly(); While (count.get() == 0) {notempty.await (); } // queue out and get the queue element x = dequeue(); // The code for getAndDecrement is unsafe.getAndAddInt(this, valueOffset, -1), which gets the current number of elements and -1 c = count.getAnddecrement (); If (c > 1) notempty.signal (); // if (c > 1) notempty.signal (); } finally {takelock.unlock (); } // When c is equal to the maximum capacity, that is, the queue is full before the current element is fetched. After the element is fetched, the queue will empty a space, so the current thread will wake up the thread that performed the insert to inform one of the others that the insert is available. if (c == capacity) signalNotFull(); return x; } / / remove the element on the list, the head node to remove private E to dequeue () {/ / assert takeLock isHeldByCurrentThread (); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); Try {// Wake up the put waiting lock notfull.signal (); } finally { putLock.unlock(); }}Copy the code

Poll () = poll(); poll() = poll(); poll() = poll()

 
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    } 

Copy the code

Other methods

Peek () queries for the elements of the head node

public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); }}Copy the code

The contrast

  • ArrayBlockingQueue: Array-based first-in, first-out bounded circular queue that uses the same lock, so it cannot read and operate in parallel even on a multi-core CPU

  • LinkedBlockingQueue: a first in, first out (FIFO) queue based on a linked list. Read and insert operations use two different locks that are not interfering with each other, so both operations can be done in parallel. Therefore LinkedBlockingQueue has a higher throughput than ArrayBlockingQueue.

The queue

  • Deque double-ended queue:
  • Not implementing a blocking interface:
    • LinkedList: Implements the Deque interface, restricted queues
    • PriorityQueue: a PriorityQueue, essentially maintaining an ordered list. Either a natural sort or a custom sort by passing the Comparator constructor.
    • ConcurrentLinkedQueue: Queue that is thread safe based on the list. Add delete O(1) find O(n)
  • Implementing the BlockingQueue interface: Five blocking queues that implement the BlockingQueue interface. Threads that block do not add or remove elements directly, but wait until they have space to do so.
    • ArrayBlockingQueue: Array-based bounded queue
    • LinkedBlockingQueue: Unbounded queue based on a linked list
    • PriorityBlockingQueue: An unbounded queue based on priority
    • DelayQueue: Queue based on time priority
    • SynchronousQueue: Queues that have no internal containers are special — they have a unique thread-to-thread pairing mechanism