preface


Read on for the JUC source code, which shows a non-blocking, unbounded thread-safe queue called ConcurrentLinkedQueue.


[liuzhirichard] Record technology, development and source code notes in work and study. From time to time, share what you’ve seen and heard in your life. Welcome to guide!

introduce

Sort elements FIFO (first in, first out) based on an unbounded thread-safe queue of linked nodes. The head of the queue is the longest element in the queue, and the tail of the queue is the shortest element in the queue. A new element is inserted at the end of the queue, and the queue retrieval operation retrieves the element at the head of the queue.

ConcurrentLinkedQueue is an appropriate choice when many threads share access to a common collection. As with most other concurrent collection implementations, null elements are not allowed in this class.

The basic use

public class ConcurrentLinkedQueueTest {

    public static void main(String[] args) {

        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

        // Inserts the specified element at the end of the queue.
        queue.add("liuzhihang");
        // Inserts the specified element at the end of the queue.
        queue.offer("liuzhihang");

        Returns null if the queue is empty.
        queue.peek();
        // Get and remove the head of this queue, return null if this queue is empty.queue.poll(); }}Copy the code

Source code analysis

The basic structure

Parameter is introduced

private static class Node<E> {
    
    // The element in the node
    volatile E item;
    // Next node
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }
    // CAS sets the node element
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
    // Set the next node
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    // Set the next node in CAS mode
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    / / omit...
}
Copy the code

ConcurrentLinkedQueue contains an inner class Node, as shown above. This inner class is used to identify a Node in the list. The list in ConcurrentLinkedQueue is one-way.

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
        
    // Other omissions

    / / head node
    private transient volatile Node<E> head;      

    / / end nodes
    private transient volatile Node<E> tail;
}
Copy the code

The header and tail nodes are volatile to ensure memory visibility.

The constructor

public ConcurrentLinkedQueue(a) {
    head = tail = new Node<E>(null);
}
Copy the code

When an object is created, both the head and tail nodes point to an empty node.

Add elements

public boolean add(E e) {
    return offer(e);
}
public boolean offer(E e) {
    
    // Verify that it is null
    checkNotNull(e);

    // Create a node
    final Node<E> newNode = new Node<E>(e);

    // loop to queue
    // t is the current tail node, p starts with t
    for (Node<E> t = tail, p = t;;) {
        // q is the next node of the tail node
        Node<E> q = p.next;
        if (q == null) {
            // If the value is empty, there is no node behind it. In this case, CAS sets the last node
            if (p.casNext(null, newNode)) {
                // in this case, p.ext is newNode
                // If p! = t indicates concurrency
                if(p ! = t)// Other threads have updated tail
                    // q = p.ext so q == null is incorrect
                    // q fetched t.ext
                    // At this point, update tail to the new node
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // In multithreaded cases, the poll operation removes elements, which may result in p == q
        // Now you need to look again
        else if (p == q)
            // p = (t ! = (t = tail)) ? t : head;else
            // Check tail and updatep = (p ! = t && t ! = (t = tail)) ? t : q; }}Copy the code

Drawing instructions:

  • Single thread:
  1. When performing theNode<E> q = p.next;, the current situation is as shown in the figure:

  1. judgeq == nullIf the condition is met, the command is executedp.casNext(null, newNode)Use CAS to set p.ext.
  2. Once the setup is successful,p == tThere is no change, so the program exits.
  • Multithreading:
  1. When performing theNode<E> q = p.next;, the current situation is as shown in the figure:

  1. Multiple thread executionp.casNext(null, newNode)Use CAS to set p.ext.
  2. The CAS configuration for thread A succeeds:

  1. The CAS fails to be executed in thread B. The CAS fails to be executed in thread Bp = (p ! = t && t ! = (t = tail)) ? t : q.

  1. Repeat the loop and you can set it up successfully.

Access to elements

public E poll(a) {
    restartFromHead:
    // Infinite loop
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            // An iterm for the header
            E item = p.item;
            // If the current node is not null, CAS is set to null
            if(item ! =null && p.casItem(item, null)) {
                // If CAS is successful, the flag is removed
                if(p ! = h)// hop two nodes at a timeupdateHead(h, ((q = p.next) ! =null)? q : p);return item;
            }
            // Return null if the current queue is not empty
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // repeat the loop
            else if (p == q)
                continue restartFromHead;
            elsep = q; }}}Copy the code

The drawing process is as follows:

  1. If the queue is empty while executing the inner loop:E item = p.item;In this case, iterm is nullupdateHead(h, p)And returns NULL.
  2. Suppose there is a concurrent insert operation and an element is added, as shown in the figure below:

And then the last else will do p = q

  1. Continue the loop to get item and executep.casItem(item, null)And then judgep ! = hUpdate head and return item.

The situation here is more complicated, here is just one, if you need to list a few more.

The code for viewing an element is similar to the code for retrieving an element.

The size operation

public int size(a) {
    int count = 0;
    for(Node<E> p = first(); p ! =null; p = succ(p))
        if(p.item ! =null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
Copy the code

CAS is not locked, so the size is not accurate. And size will iterate over the list, which costs performance.

conclusion

ConcurrentLinkedQueue is used relatively infrequently in the workplace, so I only skim the source code to get a sense of the common apis and underlying principles.

A simple summary is to use a one-way linked list to hold queue elements, internally using a non-blocking CAS algorithm without locking. Therefore, the calculation of size may not be accurate, and size will also traverse the linked list, so it is not recommended to use it.