1. Overview of LinkedBlockingQueue

LinkedBlockingQueue, as the name suggests, is a linked, BlockingQueue. Queue, the first thing that comes to mind is the FIFO feature. LinkedBlockingQueue is an implementation of Linked lists and ArrayBlockingQueue is an implementation of order lists. Since queues operate only at the beginning and end, the time complexity of manipulating linked and sequential lists is the same, but the implementation of sequential lists takes less space because the “pointer” field (next) is not required, but the space must be contiguous; The chained implementation does not require contiguous space, but uses next to point to the next node location, as shown in the node structure of LinkedBlockingQueue.

static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}Copy the code

Blocking, LinkedBlockingQueue is thread-safe. When the queue is full, all queueing operations are blocked. When the queue is empty, all dequeueing operations are blocked. When the queue is initialized, we can specify queue length capacity. If this is not specified, the default capacity of LinkedBlockingQueue is integer.max_value. Obviously, capacity is still an immutable value.

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;Copy the code

2, LinkedBlockingQueue implementation code details

If you want to understand LinkedBlockingQueue implementation, need to be familiar with the wait/notify and AbstractQueuedSynchronizer (AQS). As an aside, I personally think there are three things that are very important in concurrent programming: wait notification, CAS, and AQS.

2.1 the head and tail

  1. Head and tail indicate the beginning and end of a queue, respectively, to quickly locate take and PUT operations. Note the distinction between head and first.
transient Node<E> head;
private transient Node<E> last;

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}Copy the code

2.2 the count

  1. Count represents the number of elements in the current queue, and it uses the AtomicInteger class under the packet to implement atomic operations, the core of which is still the CAS operation. Count of type AtomicInteger is critical for queue thread-safety, as you will see later that the take and PUT operations are two separate locks.
  2. If you’re interested, look at ArrayBlockingQueue, which uses only one lock to keep it thread-safe, so its count is not an AtomicInteger, but an int.
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();Copy the code

2.3 Locks and conditions

  1. TakeLock and putLock define the take operation and put operation lock, respectively.
  2. The condition of the take operation is notEmpty, so the take operation is performed to determine whether there are still elements in the current queue that can be taken, and if there are not, it is performednotEmpty.await()Let the take thread block.
  3. The condition for a PUT operation is notFull, so a put operation is performed to determine whether the current queue has space to put elements. If there is no space to put elements, it is performednotFull.await().
  4. After a successful take, the queue is checked to see if it was full before the take. If so, a put thread might be blocked, so the call is calledsignalNotFull()Method to wake up those PUT threads.
  5. After a successful put, the queue is checked to see if it was empty before the put. If it is, there may be a take thread that is blocked, so it is calledsignalNotEmpty()To wake up those take threads.
  6. The design of step 4 and step 5 is quite good. First determine whether it is Empty or Full, and then call the wake up method to avoid unnecessary wake up operations. But this step is a little hard to understand.
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();Copy the code

2.4 put

  1. Insert a specified element e at the end of the pair, and if there is no space, the thread will wait.
  2. E cannot be null. This queue does not store null elements. Otherwise, NullPointerException is thrown
  3. The local variable c, which starts at -1, stores the number of elements in the current queue, exactly before the PUT operation, becausec = count.getAndIncrement()And thegetAndIncrement()The value returned is previous (the value of C is important because the wake up operation is not understood otherwise).
  4. putLock.lockInterruptibly()Get put lock.
  5. ifcount.get() == capacity, that is, there is no remaining space in the queue, then the condition isnot FullThe put operation is to execute the statementnotFull.await()Go into wait; Otherwise, join the team normally.
  6. After joining the queue normally, count is increased by 1, and C gets the value before joining the queue (note this).
  7. c + 1 < capacityMeans that if the number of elements in the current queue is less than capacity, then we can wake up those conditionsnot Empty(of course, there is not necessarily a wait thread at this point).
  8. if(c == 0)If the queue is empty before put, then it is possible that a take thread is waiting, so executesignalNotEmpty(), which acquires the take lock and wakes up the waiting take thread to take it.
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}Copy the code
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); }}Copy the code

2.5 take

  1. The principle of “take” and “put” is the same. “take” starts from the first node. If there are no nodes in the queue, then the take thread needs to wait.
  2. The local variable c, with an initial value of -1, stores the number of elements in the current queue, specifically before the take operation.
  3. takeLock.lockInterruptibly()Get the Take Lock.
  4. count.get() == 0If there are no elements in the current queue, then the condition isnot EmptyThe take thread will wait; Otherwise, leave the line normally.
  5. c > 1If there are at least two elements in the queue before a take, then other waiting take threads can be woken upnotEmpty.signal().
  6. c == capacityThe queue is full before the take operation, so there is a chance that a PUT thread is waiting, so executesignalNotFull(), which first acquires the PUT lock and then wakes up put threads that might be waiting.
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}Copy the code
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); }}Copy the code

2.6 offer

  1. The overloaded two Offer methods are also essentially PUT operations, but operate slightly differently.
  2. An offer method provides the wait time for a thread to enter the condition’s wait queue first.
  3. Another offer method is to join the team if you can, but return false if you can’t.
  4. The two offer methods can be selected according to actual needs.
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}Copy the code
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}Copy the code

2.7 poll and peek

  1. The two poll methods are also modified versions of take. One is to wait for timeout, and the other is not to wait at all. The two methods can be selected as needed in practical application.
  2. The peek method differs from the take method in that it does not “look” at the first element, first.item.
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}Copy the code
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}Copy the code
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); }}Copy the code

3, summarize

  1. LinkedBlockingQueue represents the producer/consumer model, with wait/notify mechanisms to wait and wake up take and PUT threads.
  2. The combination of the AtomicInteger type count (the current number of elements in the queue) and the dual locking mechanism (take and PUT locks) makes LinkedBlockingQueue thread-safe. The implementation method is worth learning and experiencing.

Limited ability and time (of course, the main ability), mistakes and omissions also please comment.