In concurrent programming we sometimes need to use thread-safe queues. If we want to implement a thread-safe queue there are two ways to implement it one is to use a blocking algorithm and the other is to use a non-blocking algorithm. Queues using blocking algorithms can be implemented with one lock (same lock for entry and exit) or two locks (different lock for entry and exit), while non-blocking implementations can be implemented with cyclic CAS. In this article, let’s take a look at how Doug Lea implemented the thread safe ConcurrentLinkedQueue in a non-blocking way. We can learn some concurrent programming techniques from him. ConcurrentLinkedQueue is an unbounded thread-safe queue based on linked nodes. It sorts nodes on a first-in, first-out basis. When we add an element, it is added to the end of the queue. When we get an element, it returns the element at the head of the queue. It is implemented using the “wait-free” algorithm, which is modified from the Michael & Scott algorithm. ConcurrentLinkedQueue = ConcurrentLinkedQueue = ConcurrentLinkedQueue = ConcurrentLinkedQueue = ConcurrentLinkedQueueConcurrentLinkedQueue consists of head nodes and TAIR nodes. Each Node (Node) consists of a Node element (item) and a reference to the next Node (Next). It is through this next that nodes are associated to form a queue with a linked list structure. By default, the head node stores an empty element and the tair node is equal to the head node.
private transient volatile Node<e> tail = head;
Copy the code
Enter queue ##Enqueueing means adding the enqueued node to the end of the queue. To make it easier to understand the changes in the queue, as well as the changes in the HEAD and TAIR nodes, I made a snapshot of the queue for each node added.
Step 1 add element 1: queue updates the next node of the head node to element 1 node. Since the tail node is equal to the head node by default, their next nodes point to the element 1 node. Step 2 Add element 2: The queue first sets the next node of element 1 to element 2, and then updates the tail node to point to element 2. Step 3 add element 3: Set the next node of the tail node to element 3. Step 4 add element 4: Set the next node of element 3 to element 4, and point the tail node to element 4.
By debugging the enqueue process and observing the changes of the head node and tail node, you can find that the enqueue node is set as the next node of the last node of the current queue. If the next node of the tail node is not empty, set the queue node to the tail node. If the next node of the tail node is empty, set the queue node to the next node of the tail node. Understanding this will be very helpful when we look at the source code.
The above analysis allows us to understand the queue joining process from the perspective of a single thread, but multiple threads joining the queue at the same time becomes more complicated because other threads may jump the queue. If a thread is being queued, it must fetch the tail node and then set the next node to be queued, but another thread may have queued, so the tail node of the queue changes, and the current thread will pause the queue and retrieve the tail node again. Let’s go through the source code to analyze in detail how it uses the CAS algorithm to join the team.
public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(E); Retry: // In an endless loop, try to join the team again. for (;;) {// create a reference to the tail Node Node<E> t = tail; //p is used to represent the tail node of the queue, which is equal to the tail node by default. Node<E> p = t; for (int hops = 0; ; Hops++) {// get the next node of the p node. Node<E> next = succ(p); // the next node is not empty, indicating that p is not the last node, and we need to update p to point to the next node. If (hops > hops && t! = tail) continue retry; p = next; } // If p is the last node, set the next node of p to the queue node. Else if (p.casNext(null, n)) {// If the tail node has at least one next node, set the tair node to the tair node. If (hops >= hops) casTail(t, n); // Update tail node to allow failure return true; } else {p = succ(p); }}}}Copy the code
From a source code point of view the whole team entry process does two things. The first is to locate the tail node. The second is to use CAS algorithm to set the joining node as the next node of the tail node. If not successful, try again.
The first step is to locate the tail node. The tail node is not always the tail node, so every time you join a queue, you must first find the tail node through the tail node. The tail node may be either the tail node or the next node of the tail node. The first if in the loop body of the code determines whether tail has a next node, and if it does, the next node may be a tail node. If p node and p next node are both empty, the queue has just been initialized and is about to add the first node, so return the head node. The code of next node of p node is as follows:
final Node<E> succ(Node<E> p) {
Node<E> next = p.getNext();
return (p == next) ? head : next;
}
Copy the code
The second step sets the joining node as the tail node. The p.casNext(null, n) method is used to set the enqueued node as the next node of the tail node of the current queue. If p is null, it means that P is the tail node of the current queue; if not, it means that other threads have updated the tail node, and the tail node of the current queue needs to be acquired again.
The design intention of Hops. The code and logic written by Doug Lea is still a bit more complicated. So can I do it the following way?
public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node</e><e> n = new Node</e><e>(e); for (;;) { Node</e><e> t = tail; if (t.casNext(null, n) && casTail(t, n)) { return true; }}}Copy the code
Having the tail node always be the tail node of the queue makes the implementation code minimal and the logic very clear and easy to understand. The downside of doing this, however, is that you need to update the tail node each time using a circular CAS. If the number of CAS updating tail nodes can be reduced, the efficiency of joining the queue can be improved. Therefore, Doug Lea uses the hops variable to control and reduce the update frequency of tail nodes, and does not update tail nodes to tail nodes every time a node joins the queue. The tail node is updated only when the distance between the tail node and the tail node is greater than or equal to the constant value of HOPS (the default value is 1). The longer the distance between the tail node and the tail node, the fewer times the TAIL node is updated by CAS. However, the longer the distance, the longer the negative effect is that it takes longer to locate the tail node when joining the queue. Because the loop body needs to do more than one loop to locate the tail node, it still improves enqueueing because it essentially reduces writes to volatile variables by increasing reads to volatile variables, which are much more expensive than reads. So the efficiency of joining the team will be improved.
private static final int HOPS = 1;
Copy the code
One other thing to note is that the join method always returns true, so don’t use the return value to determine whether the join was successful. To exit a queue, return a node element from the queue and empty its references to the element. Let’s take a look at how the head node changes by taking a snapshot of each node as it exits the queue.
As you can see from the above figure, the head node is not updated every time the queue exits. When there is an element in the head node, the element in the head node is popped directly, and the head node is not updated. The head node is updated only when there are no elements in the head node. In this way, the hops variable is also used to reduce the consumption of CAS updating the head node, so as to improve queue dispatch efficiency. Let’s go through the source code to in-depth analysis of the team process.
public E poll() { Node<E> h = head; // p indicates the head Node. Node<E> p = h; for (int hops = 0;; Hops++) {// get the element E item = p.getitem (); // If p is not empty, CAS sets the element referenced by p to null, and returns the element referenced by p on success. if (item ! = null && p.casitem (item, null)) {if (hops >= hops) {// Set the next Node of p to head Node Node<E> q = p.genext (); updateHead(h, (q ! = null) ? q : p); } return item; } // If the element of the head node is empty or the head node has changed, this indicates that the head node has been modified by another thread. Node<E> next = succ(p); If (next == null) {// Update the header node. updateHead(h, p); break; } // If the next element is not empty, set the next node of the head node to the head node p = next; } return null; }Copy the code
Acquire the head node of the element, and then determine whether a head node element is empty, if is empty, said another thread has conducted a team operation will take the node element, if not null, use the way of CAS head node reference set to null, if the CAS is successful, the direct return head node element, if you don’t succeed, Indicates that another thread has performed a queue operation to update the head node, resulting in a change in the element and the need to retrieve the head node.