In concurrent programming, you sometimes need to use thread-safe queues. If you want to implement a thread-safe queue there are two ways: one is to use a blocking algorithm and the other is to use a non-blocking algorithm. Queues using the blocking algorithm can be implemented with either one lock (same lock for entry and exit) or two locks (different locks for entry and exit). A non-blocking implementation can be implemented by looping CAS. ConcurrentLinkedQueue is a thread safe queue that is implemented in a non-blocking way. The data structure that implements the queue is a chain, as the class name indicates.

(note: in the process of reading the source code, involves the unsafe methods do not understand, want to understand, can go to the article, very detailed, Ctrl + f to find) directly: www.jianshu.com/p/1cc04a31f…

ConcurrentLinkedList is an unbounded thread-safe queue based on linked nodes, which are sorted on a first-in, first-out basis. When we add an element, we add it to the end of the queue, and when we retrieve the element, we return the element in the header. CAS algorithm is used to achieve. Let’s take a look at the overall class diagram:

ConcurrentLinkedQueue inherits AbstartQueue and implements the Queue interface. The top layer implements Iterable, which iterates over elements and belongs to collections. ConcurrentLinkedQueue consists of a head Node and a tail Node. Each Node consists of a Node element (item) and a reference to the next Node (next). Nodes are associated with each other through this next Node. To form a linked list of queues. By default, the head node stores empty elements and the tail node is equal to the head node.

You can start by looking at the Node elements used to store data

private static class Node<E> { volatile E item; // Volatile node <E> next; Node(E item) {// putObject, an Unsafe object, is an efficient way to store data. UNSAFE.putObject(this, itemOffset, item); } Boolean casItem(E CMP, E val) {** * compareAndSwapObject(Object o long offset,Object expected,Object x); * Update the variable value to x if the current value is expected * O: object offset: offset Expected: expected x: new value */returnUNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } //cas operation modifies variable values. boolean casNext(Node<E> cmp, Node<E> val) {returnUNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); }}}Copy the code

Then look at two global variables:

private transient volatile Node<E> head; Private TRANSIENT volatile Node<E> tail; / / end nodesCopy the code

When initializing the constructor, the default constructor looks like this:

public ConcurrentLinkedQueue() {// The first node is the same as the last node when initialized, because there is no value. head = tail = new Node<E>(null); } // At the same time, statically initialize some of the same parameters as Node,Copy the code

A few more CAS operations for the design need to be known

// Change Node data field item Boolean casItem(E CMP, E val) {returnUNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); Next void lazySetNext(Node<E> val) {unsafe. putOrderedObject(this, nextOffset, val); Next Boolean casNext(Node<E> CMP, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
Copy the code

You can see that these methods are actually made by calling the methods of the UNSAFE instance, the sun.misc. UNSAFE class. To learn more about the Unsafe can see me here to share this article: www.jianshu.com/p/1cc04a31f…

As you can see, ConcurrentLinkedQueue is Concurrent, so if you look at the source code, you need to consider not only the single-thread case, but also how the multi-thread case is running and how the thread is running. At the same time, there will be an incoming queue and an outgoing queue, and you need to think about how it works when you have both incoming and outgoing queues. The values are as follows: Offer indicates an incoming queue; poll indicates an outgoing queue. 1

  1. The main operation of enqueueing is to operate on the next node of the Tail node. In single-threaded cases, the operation has no effect on the enqueueing.
  2. The main operation of dequeuing and enqueuing is the operation on the head node, which has no effect on the dequeuing in the single-thread case. 2. Multithreading
  3. If the next node is not in the queue, a new node is inserted. At this time, the obtained tail node is dirty data, and the tail node needs to be obtained again.
  4. Multithreading out queue

3. Enter and exit the queue at the same time

  1. If the incoming queue is much larger than the outgoing queue, there’s usually no overlap between the two. This can be discussed in terms of multithreading.

  2. The outbound queue is larger than the inbound queue

    If the outgoing queue is larger than the incoming queue, that is, the speed of queue head deletion is faster than the speed of adding nodes at the end of queue. As a result, queue length will be shorter and shorter, and offer thread and poll thread will appear “intersection”, that is, at that moment, it can be called the critical point of nodes operated by offer thread and poll thread at the same time. And the offer thread and the poll thread must interact on this node. Then the occurrence of two operations can be divided into the following two types according to the critical point:

  3. The order of execution is offer >poll >offer, which indicates that when the offer thread inserts Node2 after Node1, the poll thread has deleted Node1.

2. Poll –>offer–>poll: when the poll thread is ready to delete a null node (the queue is empty), the offer thread inserts a node to make the queue become non-empty.

Queue source code (offer)

 public boolean add(E e) {
        returnoffer(e); } public Boolean offer(E E)Single # 1checkNotNull(e); // Wrap the queue into a node class to better manage and manipulate the nodes in the queue. final Node<E> newNode = new Node<E>(e);Single # 2
        
        for (Node<E> t = tail, p = t;;) {        // where p = t = tail//ConcurrentLinkedQueue <E> q = p; //ConcurrentLinkedQueue <E> q = p; //ConcurrentLinkedQueue <E> q = p;#单4
            if (q == null) {                Single # 5

                if (p.casNext(null, newNode)) {       Single # 6

                    if(p ! = t) // hop two nodes at a time casTail(t, newNode); // Failure is OK.Single # 9
                    return true;          #单7
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)    # 1 morep = (t ! = (t = tail)) ? t : head;else       Single 8 #
                // Check fortail updates after two hops. p = (p ! = t && t ! = (t = tail)) ? t : q; }}Copy the code

In the single-thread case (see the code order of # single), queue 1 is entered for the first time. Check whether the incoming data is null. 2. Package the data as node. 3. Note that the head and tail stored in the node are both null and equal during initialization. If the next node on the tail node is null, the CAS operation is performed directly to queue the tail node. If the CAS operation succeeds, the cas operation succeeds.

As shown in the figure, the tail node of the queue should be Node1, and the node that tail points to is still Node0. Therefore, tail is delayed updating. The next next node on the tail node will not be null, and the code under the condition # sing8 will be executed:

p = (p ! = t && t ! = (t = tail)) ? t : q;Copy the code

If this code were executed in a single-threaded environment, it would be obvious that since p==t, p would be assigned q, and q would be Node q = p.ext, and tail would be found. The current node is then set to the end of the queue in the order # singleton 5, # singleton 6, and # singleton 9. The reason for casTail’s failure is that the offer code determines the logical direction of p’s next Node q(Node q = p.ext). If casTail fails to set tail to Node0, the tail will still point to Node0. It’s just a couple of loops through the # single 8 code to locate the last node in the queue.

Multi-threaded execution Angle analysis

1. Multiple threads offer to see p = (p! = t && t ! = (t = tail)) ? t : q; This line of code is in a single thread, this code will never assign p to t, so it won’t do anything. This is actually an interesting line of code in a multithreaded environment. t ! = (t = tail) This operation is not an atomic operation.

2. Offer ->poll->offer

Now let’s analyze the # extra 1 line of code. p = (t ! = (t = tail)) ? t : head; It can be assumed that this is a case of answering part thread offer and part poll. When if (p == q) is true, it means that the next of the node pointed to by P also points to itself. Such node is called sentinel node, which has little value in the queue and is generally represented as the node to be deleted or empty node.

The source poll method of the queue

public E poll() {
    restartFromHead:
    1. for(;;) {2.for (Node<E> h = head, p = h, q;;) {
    3.        E item = p.item;

    4.        if(item ! = null && p.casItem(item, null)) { // Successful CAS is the linearization point //for item to be removed from this queue.
    5.            if(p ! = h) // hop two nodes at a time 6. updateHead(h, ((q = p.next) ! = null) ? q : p); 7.return item;
            }
    8.        else if ((q = p.next) == null) {
    9.            updateHead(h, p);
    10.            returnnull; 11}.else if (p == q)
    12.            continue restartFromHead;
            else
    13.            p = q;
        }
    }
}

final void updateHead(Node<E> h, Node<E> p) {
        if(h ! = p && casHead(h, p)) h.lazySetNext(h); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }Copy the code

Single thread Angle

Locate the deleted header node

On line 3, item is null, on line 4, if is false, and on line 8 (q = p.ext), if is also false, since q refers to Node2, and on line 11, if is also false, so on line 13, At this point, p and Q both point to Node2 and find the actual header node to delete. It can be concluded that the process of locating the head node to be deleted is as follows: If the data field of the current node is null, it is obvious that the node is not the node to be deleted, and the next node of the current node is used to test it. After the first cycle, the state is shown below

This method mainly points the head of the queue to Node3 via casHead and points the next field of Node1 to itself via H.lazysetNext. Finally, line 7 returns the value of Node2. The status of the queue is shown in the figure below

Single-threaded summary execution

If the item of the node pointed to by head,h, and P is null, then the node is not the node to be deleted. Probe by making q point to the next node of P (q = p.ext). If found, update the node pointed to by head with updateHead method and construct sentinel node (h.lazysetNext (h) of updateHead method).

Multithreading execution analysis:

else if (p == q)
    continue restartFromHead;
Copy the code

This section is the case of multiple thread polls. Q = P. next means that Q always points to the next node of P, and only the node pointed to p becomes a sentinel node when polling (via H. lazysetNext in updateHead). When thread A decides that p==q, thread B has completed the poll method to convert the node pointing to P to the sentinel node and the node pointing to head has been changed, so it needs to execute from restartFromHead to ensure that the latest head is used.

poll->offer->poll

If the current queue is empty, thread A poll, thread B poll, and thread A poll, does thread A return null or the latest node that thread B just inserted?

public static void main(String[] args) {
    Thread thread1 = new Thread(() -> {
        Integer value = queue.poll();
        System.out.println(Thread.currentThread().getName() + "The value of poll is:" + value);
        System.out.println("Queue is currently empty:" + queue.isEmpty());
    });
    thread1.start();
    Thread thread2 = new Thread(() -> {
        queue.offer(1);
    });
    thread2.start();
}

Copy the code

The output is:

The value of thread-0 poll is null queue.false

Copy the code

If (q = p.ext) == null); if (q = p.ext) == null); if (q = p.ext) == null) Thread2 then inserts an offer into a node with a value of 1, and thread2 finishes. Thread1 does not retry. Instead, the code continues down the queue, returning NULL, even though thread2 has inserted a new node of value 1. So the output thread0 poll is null, and the queue is not empty. Therefore, whether a queue isEmpty or not cannot be determined by returning null in poll time. IsEmpty can be used to determine whether a queue isEmpty.

And then we go back to the offer method, and there’s one

11.        else if (p == q)

Copy the code

Poll ->poll->offer: poll->offer: poll->offer: poll->offer

p = (t ! = (t = tail)) ? t : head; .Copy the code

Since the tail pointer has not changed, p is assigned head, and the insert is completed from head again.

  1. The design of the HOPS

Tail update trigger time: When the next node of the node pointed to by tail is not null, the real tail node of the queue is located. After the tail node is found and inserted, the tail update is performed through casTail. When the next node of the node to which tail points is null, only the node is inserted and tail is not updated. Trigger time of head update: When the item field of the node pointed to by head is null, the operation of locating the real head node of the queue will be performed. After the head node is found and deleted, the head update will be performed through updateHead. If the item field of the node to which the head points is not null, delete the node and do not update the head. In addition, during the update operation, the source code will have a comment: Hop two nodes at a time. So this strategy of delaying updates is called HOPS

If tail is always the last node of the queue, the implementation is less code and the logic is easier to understand. However, there is a disadvantage to doing this. If a large number of enqueueing operations are performed, each time the CAS is executed for tail updates, it can add up to a significant performance loss. If the operation of CAS update can be reduced, the operation efficiency of joining the team can be greatly improved. Therefore, Doug Lea uses CAS to update tail every time (the distance between tail and the node at the end of the team is 1). The same is true for head updates. Although this design increases the number of tail nodes in the loop, overall the read operation is much more efficient than the write performance, so the performance loss of the extra tail nodes in the loop is relatively small.

Some caveats

Although ConcurrentLinkedQueue performs well, the size() method traverses the collection once, which is very slow to execute. Therefore, the use of this method should be minimized. IsEmpty () is the best method to determine whether it isEmpty or not.

ConcurrentLinkedQueue does not allow the insertion of null elements and throws a null-pointer exception.

ConcurrentLinkedQueue is unbounded, so be aware of memory overflow when using it. That is, it is used when the concurrency is not very large and medium. Otherwise, it takes up too much memory or overflow, which has a great impact on the performance of the program, or even fatal.

Application scenarios

ConcurrentLinkedQueue is mostly used for message queues

To be continued

From: www.jianshu.com/p/001c45716… “The Art of Java Concurrent Programming” “Java High Concurrent Programming” www.cnblogs.com/sunshine-20…