Structure of 1.

LinkedBlockingQueue inheritance, core member variables, and main constructors:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    // List of nodes
    static class Node<E> {
        E item;

        // Next to the current element, null means the current node is the lastNode<E> next; Node(E x) { item = x; }}// The maximum size of the linked list. The default is integer.max_value
    private final int capacity;

    // How many elements does the list already have, using AtomicInteger, so it is thread safe
    private final AtomicInteger count = new AtomicInteger();

    / / head
    / / note; Head usually points to a sentinel node (data null).
    // 1. Purpose: to assist queue removal, i.e. when take/poll/remove needs to dequeue headers.
    // 2. Note that the sentinel node changes with each exit. See the dequeue method comment below for details
    transient Node<E> head;

    / / list the tail
    private transient Node<E> last;
	
	/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- a lock -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
    // Lock when taking
    private final ReentrantLock takeLock = new ReentrantLock();
    
    // The queue cannot be empty when the queue exits
    private final Condition notEmpty = takeLock.newCondition();

    // Lock when put
    // Note: Both locks are designed so that take and put can be performed simultaneously. ArrayBlockingQueue has only one lock.
    private final ReentrantLock putLock = new ReentrantLock();
    
    // The queue cannot be full when entering a queue
    private final Condition notFull = putLock.newCondition();
 
    // iterator. LinkedBlockingQueue implements its own iterator
    private class Itr implements Iterator<E> {}
    
    / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the constructor -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
    // constructor 1: empty parameter constructor. If no capacity is specified, the default value is Integer
    public LinkedBlockingQueue(a) {
        this(Integer.MAX_VALUE);
    }
    
    Constructor 2: Specifies the size of the list.
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // Make the list start and end equal, indicating that the current list is empty, and create the first sentinel node
        last = head = new Node<E>(null);
    }

    Constructor 3: initializes the existing collection data
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                // Set elements cannot be empty
                if (e == null)
                    throw new NullPointerException();
                // Capacity represents the size of the linked list, in this case the maximum value of Integer
                // If the size of the collection class is greater than the maximum value of Integer, an error is reported
                // This can be done outside the for loop to reduce the Integer's maximum number of loops (worst case).
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally{ putLock.unlock(); }}}Copy the code
  • The function of a linked list is to save the current node. The data in the node can be anything, which is a generic type. For example, when a queue is applied to a thread pool, the node is a thread; for example, when a queue is applied to a message queue, the node is a message
  • The take lock and PUT lock are designed to ensure thread safety during queue operations. The take lock and PUT lock are designed to allow the two operations to be performed at the same time without affecting each other. If there is only one lock put, only one lock take can be performed at a time
  • During the initialization, the capacity does not affect the performance, but only the later usage. Because the initialization queue is too small, it is easy to report the error that the queue is full when the capacity is not enough
  • When initializing a given set of data, we have no objection to checking whether the current list exceeds its capacity each time the for loop starts, but we want to do this before the for loop starts. For example, given a set size of 1W and a linked list size of 9K, according to the current code implementation, the size of the given set can only be found when the for loop is 9K times, resulting in 9K times is a waste of resources, it is better to check once before the for loop. If 1W is greater than 9K, an error message is displayed

2. Method parsing & API

The main method of queue is nothing more than three: join the queue, out of the queue, queue leader. So let’s start with these three methods and look at the implementation of LinkedBlockingQueue. ArrayBlockingQueue (JUC source code) ArrayBlockingQueue (JUC source code) ArrayBlockingQueue (JUC Source code)

2.1 the team

Put () : blocks when full

  1. First, there are some preparatory operations, such as creating a new node and getting the PUT lock and counter count
  2. Lock putLock, so subsequent new data is thread safe
  3. The new data is divided into two steps:
    • If the queue is full, the current thread is added to the conditional queue and blocked
    • Simply append the new node to the end of the list
  4. Once the new data is successfully added, the blocked PUT and Take threads are woken up at the appropriate time, ensuring that the time is not wasted
    • If the queue is not satisfied, call up the wait thread of put
    • If the queue is empty before put, wake up the wait thread for Take
// add e to the end of the queue. If there is space to add, add success, otherwise the current thread is stuck in wait
public void put(E e) throws InterruptedException {
    // The element e to be added is empty, throw an exception
    if (e == null) throw new NullPointerException();
    // set c to -1
    int c = -1;
    
    Node<E> node = new Node<E>(e); // Create a node
	
    final ReentrantLock putLock = this.putLock; // Get the lock of put
    final AtomicInteger count = this.count; // Get the number of elements in the queue
     
    putLock.lockInterruptibly(); // Set interruptible lock
    try {
        / /!!!!!! If the queue is full, the current thread is added to the notFull conditional queue and then blocked
        / / wait for a thread [new success | | take queue element | | remove elements], have the opportunity to be awakened into synchronous queue
        // A while loop is a double-check in case another thread succeeds in putting/offering/adding while the thread is already scheduled for execution
        while (count.get() == capacity) {
            // await the CPU, hibernate, release the lock
            notFull.await();
        }

        // If the queue is not full, it is added directly to the end of the queue
        enqueue(node);

        / / conut + 1. GetAndIncrement returns the old value, so c is 1 less than the true count
        c = count.getAndIncrement();

        // If the size of the chain is smaller than the capacity of the list, the queue is not full. You can try to wake up a put wait thread
        if (c + 1 < capacity)
            notFull.signal();

    } finally {
        putLock.unlock(); / / releases the lock
    }
    // c==0, which means that the queue was empty and now a new queue has been added. So an attempt is made to wake up a take waiting thread
    if (c == 0)
        signalNotEmpty();
}
Copy the code

Offer () : Returns false when full

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
    	/ /!!!!!! Check if it is full. If it is, return false
        if (count.get() == capacity)
            return false;
		// Use the same logic as put
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1< capacity) notFull.signal(); }}finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
Copy the code

Add () : Team full throw exception

Add is not implemented directly in LinkedBlockingQueue, it is in its parent Class AbstractQueue, and the logic is simply to call the Offer method

enqueue()

  • Join the team and put the new element at the end of the team
  • Add, offer, and PUT all call this method
private void enqueue(Node<E> node) {
	// 1. Connection: last.next = node
	// 2. Last = node
    last = last.next = node;
}
Copy the code

2.2 the team

Take () : empty block

An empty queue blocks. The principle of this method is similar to that of PUT

  1. Lock first, so subsequent queue operations are thread safe
  2. Data fetching is divided into two steps:
    1. If the queue is empty, the current thread is added to the notEmpty conditional queue to block
    2. Call dequeue, delete the head
  3. Once the queue is finished, see if the blocked thread can be woken up
    • The queue is not empty. Wake up the wait thread of take
    • When the queue is full before the take, wake up the put waiting thread
public E take(a) throws InterruptedException {
    E x;
    // The default value is negative, indicating failure
    int c = -1;
    
    final AtomicInteger count = this.count; // count represents the true size of the current list data
    final ReentrantLock takeLock = this.takeLock; // Get the lock of take
    
    takeLock.lockInterruptibly(); // Set the lock to interruptible
    try {
        / /!!!!!! If the queue is empty, the current thread is added to the notEmpty conditional queue and then blocked
        // Waiting for a thread to put an element into the queue, it has a chance to wake up and enter the synchronization queue
        // The while loop is double-checked to prevent the thread from being scheduled to execute, but another thread took /poll/remove and blocked in the conditional queue notFull.
        while (count.get() == 0) {
            notEmpty.await();
        }
       
        // The queue is not empty
        x = dequeue();
        
        // Count -1, which is an atomic operation, getAndDecrement returns an old value (c is 1 greater than the real count)
        c = count.getAndDecrement();
        
        // If there is a value in the queue, wake up one from the waiting thread of take.
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock(); / / releases the lock
    }
    // If one queue is idle, try to wake up one from the put waiting thread
    if (c == capacity)
        signalNotFull();
    return x;
}

Copy the code

Poll () : The poll returns null

public E poll(a) {
        final AtomicInteger count = this.count;
        / /!!!!!! Returns NULL if the queue is empty
        if (count.get() == 0)
            return null;
        E x = null;
        // Use the same logic as put
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1) notEmpty.signal(); }}finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
Copy the code

Remove () : empty throw exception

The add and remove methods are also not implemented in LinkedBlockingQueue, but also in a parent class, AbstractQueue, which calls poll directly

dequeue()

  • Get data in the queue header, delete the link header. This is where the sentinel node comes in. See the comments below.
  • Remove, poll, and take all call this method
private E dequeue(a) {
    Node<E> h = head; // get the header h (sentry node)
    Node<E> first = h.next; // Get the first element node first
    h.next = h; // help GC
    head = first; // set the header to first. This step is equivalent to deleting the previous Sentinel node
    E x = first.item; // save the data x of the first element node
    first.item = null;// Delete data from first to become a new sentinel node
    return x;
}
Copy the code

2.3 Get team leader: Peek

  • View does not delete elements and returns NULL if the queue is empty
  • Note that the take lock must also be taken on the read to prevent the header from being deleted
public E peek(a) {
    // count represents the actual size of the queue. If the queue is empty, null is returned
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
   	// Attach the take lock to avoid being deleted when reading
    takeLock.lock();
    try {
        // Get the queue header
        Node<E> first = head.next;
        // Check whether the queue head is empty and return
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock(); / / releases the lock}}Copy the code