This article begins with the introduction of concurrent queues, laying the groundwork for the introduction of thread pools. Concurrent queues can also be queued out and queued in. Another important point is how to ensure thread safety. Some concurrent queues ensure thread safety through lock, and some through CAS. Let’s start with ConcurrentLinkedQueue.

1. Introduction

ConcurrentLinkedQueue, a member of the collection framework, is an unbounded, thread-safe queue based on a one-way linked list. The order of the queue is FIFO. Using this class is a good choice when multiple threads are accessing a common collection. Null elements are not allowed. Is a non-blocking queue.

Its iterator is weak consistency, don’t throw the Java. Util. ConcurrentModificationException, may during the iteration, other operations are in progress. The **size()** method is not guaranteed to be correct because other threads can manipulate the queue during iteration.

1.1 class diagram

(The methods shown are all public)

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>

Copy the code

AbstractQueue inherits from AbstractQueue, which provides a framework for queue operations with basic methods, add, remove, Element, etc., based on offer, poll, peek(the main ones).

2. Source code analysis

2.1 Overall structure of the class

Node, the element in the queue

Private static class Node<E> {// Volatile E item; volatile Node<E> next; /** * Constructs a new node. Uses relaxed write because item can * only be seen after publication via casNext. */ Node(E  item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) {putOrderedXXX is a delayed version of putXXXVolatile, Set a value that is not immediately visible to other threads (visibility) // putOrderedXXX set values that should be volatile for this method to be useful // As to why this method is used, the main purpose is to improve efficiency, but the principle is, PutOrderedObject (this, nextOffset, val); UNSAFE, putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); // Unsafe class, you can look at private static final sun.misc. private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); }}}Copy the code

The constructor 1:

    // private transient volatile Node<E> head;
    // private transient volatile Node<E> tail;
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

Copy the code

The constructor 2:

public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }

Copy the code

Let’s start with the methods: Offer, poll and peek

2.2 offer

Add elements to the end of the queue. Because the queue is unbounded, this method will never return false

Divided into three cases for analysis (make sure to follow the code debug, step by step)

  1. Else if I am confused, I will write a blog to explain why)
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.offer("A");
        queue.offer("B");

Copy the code

The above code, analyze each step. After executing the constructor:

The head and tail of the list point to the sentinel node

Insert “A” where tail is not set.

Insert the “B”,

The single-threaded case is simpler

  1. Multithreaded offer
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; If (q == null) {// p is last node; // If (p.casnext (null, newNode)) {// Delay setting tail, Tail is set only when the first node is queued. Tail is set only when the second node is queued. = t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; Re-read next} re-read next} re-read next} Or (p == q) else if (p == q) // We have fallen off the list. in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t ! = (t = tail)) ? t : head; Else // Check for tail updates after two hops. // Check for tail updates after two hops. = t && t ! = (t = tail)) ? t : q; }}Copy the code

With the code above, look at the diagram

  • In step 1, thread A and thread B both execute
   if (p.casNext(null, newNode))

Copy the code

  • Step 2, and only one thread executes successfully, assumingThread ASuccess,Thread Bfailure becausep(a) == t(a)In this case, the command is not executedcasTail.tailThe same.q = p.next, so at this timeq(b) = Node2, thenp(b) ! = q(b).Thread Bperformp = (p ! = t && t ! = (t = tail)) ? t : q;

Thread B is about to execute

p = (p ! = t && t ! = (t = tail)) ? t : q;Copy the code
  • Step 3Now thread C enters. At this point,p(c) ! = q(c).Thread Cperform
p = (p ! = t && t ! = (t = tail)) ? t : q;Copy the code

After execution, q(c) is assigned to p(c). Loop again, at this point, q(c) == null, set p(c) next, thread C will queue the value

  • Step 4 p(c) ! = t(c).Thread CperformcasTail(t, newNode).Thread CSetting the tail
  • Thread B then executes
p = (p ! = t && t ! = (t = tail)) ? t : q;Copy the code

Since p(b) == t(b), q(b) is assigned to p(b). I keep going, and I end up with

  1. In the other case of multithreading, go back to step 3, where thread C enlists the value but has not set tail yet
  • On the basis of step 3, after thread B successfully enlists, the current situation is as follows:

At this point, thread C executes casTail(t, newNode), but now tail! = t(c), CAS fails, and returns directly.

2.2.1 summary

If the next node is null, then check whether the tail condition is met. If the next node is null, then check whether the tail condition is met.

The first case of multi-threaded offers has the problem of setting tail lag. I call this **” two-hop mechanism “and I’ll explain why we use this mechanism later. So what we’ve seen above is that we never get into the else if branch, and we only get into the else if branch when there’s another thread polling, so let’s talk about poll** and when we get into the else if branch.

2.3 poll

Delete and return the value of the header

A simple mention of single-thread and multi-thread poll, focusing on the analysis of poll and offer coexistence

  1. Single thread is relatively simple, so it is not necessary to draw diagrams. Debug step by step according to the queue above
  2. Multithreading, poll only
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; If (item! = null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p ! = h) // hop two nodes at a time updateHead(h, ((q = p.next) ! = null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; }}}Copy the code
final void updateHead(Node<E> h, Node<E> p) { if (h ! = p && casHead(h, p)) // Wait for GC h.lazysetNext (h); }Copy the code

As you can see from the above code, CAS is used to modify item and head. These variables are volatile, thus ensuring thread-safety of these variables. Polls, whether single-threaded or multithreaded, look for a valid head node, delete and return that value, continue looking if it’s not valid, or return NULL if the queue is empty.

Finally, let’s analyze the coexistence of offer and poll

  • Thread A performs the offer operation and thread B performs the poll operation. The initial state is as follows:
  • Thread A enters.
  • Thread A is going to execute
Node<E> q = p.next;

Copy the code

At this point, thread B performs an inner loop and assigns q(B) to P (B);

  • Thread B executes the inner loop again, at which point p(B).item is null, p(B) is assigned to head, h(B) next points to itself, and thread B exits
  • Thread A executes
  Node<E> q = p.next;

Copy the code

**else if (p == q)** else if (p == q)** else if (p == q)**

Entering the else if (p == q) branch only happens when poll and offer coexist.

2.4 peek

Gets the first valid node and returns

public E peek() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item ! = null || (q = p.next) == null) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; }}}Copy the code

Peek operates in a similar way to Poll, but the code is posted here.

3. Summary

ConcurrentLinkedQueue is thread safe in a non-blocking manner, and any variables that affect the entire Queue structure (which are volatile) are assigned CAS.

  • The size method is thread-unsafe and may return inaccurate results

For the “two-hop mechanism” (name yourself),

Both head and tail are permitted to lag. In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node.

Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? — ConcurrentLinkedQueue

Roughly speaking, head and tail can be set lazily. Not every time you update them is a major optimization, so you can do it with fewer CAS(which adds up to more efficiency when used by many threads). Its latency threshold is 2, and when head/tail is set, the current node is two or more steps away from first/last. This is the two-hop mechanism.

Where we can’t figure it out, it could be an optimization of the class or method. Look up to the big guy

4. Reference

Java multithreading 39-ConcurrentLinkedQueue details, speak very well, the above idea is to follow him to