An overview of the

LinkedBlockingQueue A bounded block queue that is initialized if no capacity is passed in. Intger.MAX_VALUE is a FIFO queue. So concurrency performance is generally better than ArrayBlockingQueue. LinkedBlockingQueue concurrency control using ReentrantLock plus Condition. ReentrantLock source code analysis, Condition source code analysis, and an introduction to blocking queues.

Class structure

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Static class Node<E> {E item; // point to next Node Node<E> next; Node(E x) { item = x; }} // The queue is bounded. Private final int Capacity; Private final AtomicInteger count = new AtomicInteger(); // Transient Node<E> head; FIFO private TRANSIENT Node<E> last; Private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition (); Private final ReentrantLock putLock = new ReentrantLock(); Private final Condition notFull = putLock.newCondition(); . }Copy the code

The LinkedBlockingQueue main structure is shown above. Each node holds the target element and points to the next node. Concurrent control uses ReentrantLock and Condition waiting. In the introduction to blocking queues, we said that the queue entry method waits, only the PUT and offer methods with a timeout. So we’ll see later that these are the only two methods that use notfull.await. If the thread fails to obtain the lock, it will enter the condition queue. Condition. Await is called and the condition queue is entered. A constructor

public LinkedBlockingQueue(int capacity) {
    if(capacity <= 0) throw new IllegalArgumentException(); // Initialize the capacity this.capacity = capacity; Last = head = new Node<E>(null); }Copy the code

Put method

Public void put(E E) throws InterruptedException {// Empty elements are not allowedif(e == null) throw new NullPointerException(); // The number of nodes in the queue, initializing a negative number, if there is still a negative number after the offer method is used int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; / / team before you lock, will the back of the concurrent operations into a single thread operation putLock. LockInterruptibly (); Try {// the thread enters the waiting queue when the queue is full. // The thread wakes up and decides again if the queue is fullwhilecyclewhile(count.get() == capacity) { notFull.await(); } // enqueue(node); // Get the number of nodes in the queue before joining the queue, and add AtomicIntege by 1. C = count.getAndincrement (); c = count.getAndincrement () // call up the thread waiting for notFull if the queue is notFullif(c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // If the queue is empty, invoke the notEmpty waiting threadif (c == 0)
        signalNotEmpty();
}
Copy the code

The enqueue method

Private void enqueue(Node<E> Node) {last = last.next = Node; }Copy the code

SignalNotEmpty method

// This wait is a take or timeout poll. When notEmpty is queued, the queue is empty and the thread waits private voidsignalNotEmpty() {// why do I lock this? TakeLock = this.takelock; takeLock = this.takelock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); }}Copy the code

The put method is done. The offer method is very similar, so let’s look at the offer method briefly

public boolean offer(E e) {
    if(e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // Return when the queue is fullif (count.get() == capacity)
        return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); Try {// If the queue is not full, the queue is skippedif
        if(count. Get () < capacity) {// enqueue(node); c = count.getAndIncrement(); // queue is notFull, wake up the notFull condition waiting thread. Count. GetAndIncrement returns the number of nodes in the queue before increment // So c+1 is the number of nodes in the queueif(c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } // If the queue is empty, lock the notEmpty conditional waiting threadif (c == 0)
        signalNotEmpty();
    return c >= 0;
}
Copy the code

Take method

public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; Final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); Try {// When count is 0, the current thread waits conditionallywhile(count.get() == 0) { notEmpty.await(); } // The queue is not empty, and the queue is not empty. C = count.getAnddecrement (); // if the queue is notEmpty, the thread waiting for the notEmpty condition is woken up, so the notempty. signal of the put method must be takeLock firstif(c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // if the queue is full, some thread may have queued, so wake up the notFull condition waiting threadif (c == capacity)
        signalNotFull();
    return x;
}
Copy the code
private E dequeueNode<E> h = head; Node<E> h = head; Node<E> h = head; Node<E> first = h.next; h.next = h; //help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
Copy the code

Wake up the notFull conditional wait thread

private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); }}Copy the code

The poll method is similar to the take method, so it’s easy to poll. Let’s look at the remove method. The remove method

public boolean remove(Object o) {
    if (o == null) return false; // Lock putLock and takeLock. // Because remove Object can operate on the head node, also can operate on the tail node, both lock to ensure that the operation of the head node and tail node will not cause errors due to competition. fullyLock(); try {for(Node<E> trail = head, p = trail.next; p ! = null; Trail = p, p = p.ext) {// If the object in the node is equal to the object of remove, remove itif(O.dice (p.tem)) {// Remove node unlink(p, trail);return true; }} // No node is foundfalse
        return false; } finally { fullyUnlock(); }}Copy the code

FullyLock method

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}
Copy the code

Unlink method

Void unlink(Node<E> p, Node<E> trail) {// p is the Node to be removed. Trail is the predecessor of p. Trail. Next = p.ext; trail. Next = p.ext; trail. // If p is the last node, trail needs to be assigned to last referenceif(last == p) last = trail; // If the queue was full before removal, there might be a queue waiting to be queuedif (count.getAndDecrement() == capacity)
        notFull.signal();
}
Copy the code