Queue is a common data structure, the main feature is FIFO, Java defines an interface class for it: Queue, and provides a rich implementation, there are underlying array based [bounded] Queue, there are unbounded Queue based on node link, there are blocking Queue, there are non-blocking Queue, and concurrency safety Queue.
There are two common implementations of queues: arrays and node links.
Java’s basic implementation of queues is in the package: java.util, and the concurrency security implementation mainly exists in the Java JUC package. Java’s thread pool tool, ThreadPoolExecutor, uses a blocking queue to cache tasks. Because the blocking queue has the function of wake up notification, it can notify the thread when a task is added or consumed, and at the same time ensures the safety of thread concurrency. There is another Queue in JUC that is concurrency-safe and non-blocking: ConcurrentLinkedQueue.
Applicable scenario
ConcurrentLinkedQueue is an unbounded non-blocking FIFO queue provided by Java with concurrency security features. It is applicable to multiple threads sharing access to the same collection, requiring multiple threads to actively acquire rather than blocking waiting for notification; And the size of the queue is unlimited, often in the form of node links. However, unbounded scenarios can also lead to excessive memory usage.
Underlying implementation (this article based on JDK15 source code)
As you can tell from the name, ConcurrentLinedQueue is based on the form of node links. Nodes are defined as follows:
static final class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { ITEM.set(this, item); } /** Constructs a dead dummy node. */ Node() {} void appendRelaxed(Node<E> next) { NEXT.set(this, next); } boolean casItem(E cmp, E val) { return ITEM.compareAndSet(this, cmp, val); }}Copy the code
The internal attributes are only Item and Next, which is a common queue node structure in the form of links. ConcurrentLinkedQueue algorithm is implemented based on Simple, Fast, And Practical non-blocking and Blocking Concurrent Queue Algorithms (Maged M. Michael, Michael L. Scott), It is a modified realization of the non-blocking algorithm in this paper.
In the concurrent process, no locks such as synchronized and Lock are used, but purely through CAS. At the same time, the main changes are: jVM-based recycling environment, so that elements can be recycled automatically, there is no ABA cyclic reference caused by recycling problems; In addition, the remove operation is provided to support the removal of internal elements.
Add element add(E)
Following the previous node link structure, a node is linked to the next node through next, while the next of a new node is always linked to NULL. We can do this by traversing from head to the last node every time we add to the queue, through CAS(next,E), and we have some degree of concurrency security (deletion is not taken into account here).
boolean add(E e){ E t = head; E next; while(t ! = null){ if(t.next ! = null){ t = t.next; }else if(CAS(t. ext,null,e)){return true; }else{ t=head; }}}Copy the code
Since the queue is unbounded, this method will retry until the addition succeeds, always returning true. Although there is no lock in this implementation, it is obviously inefficient. Therefore, when implementing node links, we often introduce head and tail Pointers to assist in advancing and avoid traversal. Similarly, ConcurrentLinkedQueue uses this optimization. When initialized, the queue defaults to head and tail Pointers as dummy nodes.
public ConcurrentLinkedQueue() {
head = tail = new Node<E>();
}
Copy the code
However, the next reference and the tail pointer are two different attributes, and CAS can only update one variable at a time. If you want to ensure atomic updates of the next reference and the tail pointer, you are back to using locks. How does ConcurrentLinkedQueue solve this problem?
The answer is: delay the update. In the comment document for ConcurrentLinkedQueue, Doug Lea mentions that LinkedTransferQueue also uses this approach, called Slack Threshold. After each insert, the tail pointer is not updated atomically with the next node. Here, we simulate four concurrent insertion threads:
/** * the queue is unbounded, <br/> */ public Boolean offer(E E) {final Node<E> newNode = newNode <E>(objects.requirenonnull (E)); For (Node<E> t = tail, p = t;;) { Node<E> q = p.next; If (q == null) {// Concurrent CAS control entry // p is last node; If (NEXT.compareAndSet(p, null, newNode)) {// Jump two or more nodes at a time, allowing failure. ✌ // If blocking here, tail lag behind head (not necessarily only threads of the last updated next node can update successfully) // If not, Subsequent threads can still be pushed down through next. = t) // hop two nodes at a time; failure is OK TAIL.weakCompareAndSet(this, t, newNode); return true; } // Lost CAS race to another thread; re-read next } else ... }}Copy the code
① When thread 1 adds an element, head=tail=new Node(), willP (=>pointer: pointer)
Assign tail to the next node that assigns q to p, i.ep=tail.next=null
P =t=tail, so the first update thread meets p! =t, it doesn’t updateThe tail pointer
.
② The other three concurrent threads re-enter the loop due to CAS failure. Q = tail.next! = null, enter the second branch to determine p==q, which is the branch processing for concurrent deletion at insert time. In the absence of a delete operation, p is always not equal to q because p = tail; q = p.next = tail.next
Else if (p == q) // Tail does not change after the node is removed. If tail does not change after the node is removed. // At this point, all we need to do is jump the pointer to head so that it can iterate over from the beginning. Otherwise, it's better to update tail. // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t ! = (t = tail)) ? t : head;Copy the code
Currently, in purely concurrent insert operations, Q is always the next node of P (next), which may be a new node or null. If p is null, it means that the tail node can be inserted. Otherwise, it means that the tail node has been updated by other threads. We need to retry to adjust the current state. Go straight to the third branch;
③ When the other three concurrent threads find that the tail node is updated, they execute the operation:
// Check for tail updates after two hops. // = null (the next level node is not null and cannot be added), enter this branch by judgment, and p! =t, that is, before the removal operation, p is assigned to q (i.e. move the next node), and t=tail is not executed, so tail precedes p. P =q; p=q; If (p=q, q==null) // if (p=q, q==null) // if (p=q, q==null) Then p==q returns false (only delete can do this), and within this method t! = (t=tail) always returns true, so after updating t, try to start with the new tail, i.e. P =t=tail, if the operation succeeds, then update at the last tail // otherwise, // select * from the next node. // select * from the next node. = t && t ! = (t = tail)) ? t : q;Copy the code
There are two puzzle points: puzzle point 1: p! = t seems impossible, since p = t was assigned at the beginning. Notice that it’s currently in a for(;;) Operation, this judgment condition is not used in the current loop, so the first judgment is always false; The code executes:
p = q
Copy the code
That is, the first time a thread discovers that the tail has been updated (tail.next! = null), then it will advance p (pointer) to the next node (this is the advance), re-entering the loop. Three threads advance concurrently, re-loop, and only one can CAS(tail.next, NULL,e). Thread 2, execute judgment:
// It is possible to block, or lose time slice. = t) // hop two nodes at a time; failure is OK TAIL.weakCompareAndSet(this, t, newNode);Copy the code
Now p is the next node to advance, and t stays the same, sop ! = t
If true, executeThe tail pointer
The update. That’s what the ConcurrentLinkedQueue annotation says,The tail pointer
The update isSkip updatesIs always greater than or equal to 2 nodes. In addition, the time slice may be lost after the branch is determined, and the CAS operation cannot be performed immediately.
Q = p (next); q = p (next); q = p (next);
p = (p ! = t && t ! = (t = tail)) ? t : q;Copy the code
Here we see a second puzzle point 2: pointer was pushed in the last loop (p=q), so p! = t. Enter the second conditional judgment:
t ! = (t = tail)Copy the code
At first glance t! = (t = tail), which makes it tempting to always return false. Since t is an object (non-native type), and the expression in parentheses is executed first, t is assigned tail and compared to itself. = t? Some blog posts on the Internet directly describe that t is assigned and then modified due to possible concurrency, which is incorrect because t is a local variable and cannot be concurrently modified. But as a result: as long as tail is updated, i.e. T! = tail, then this expression always returns true.
We tested in the IDEA tool:
You can see that for native types, IDEA can directly infer and prompt always return true; For the Object type, look at its bytecode:
The reason: Java is a stack-based programming language that uses postfix expressions, also known as inverse Polish expressions, for relational operations. Expressions can be expressed in three ways:
Prefix expressions: used by computers, parsed from right to left, able to eliminate parentheses infix expressions: used by humans suffix expressions: used by computers, used by stack-based programming languages, able to eliminate parentheses, parsed from left to rightCopy the code
Corresponding to the above expression:
Prefix expression:! = t = tail t infix expression: t! = (t = tail) =Copy the code
Look at the postfix expression, which executes:
T = tail; t = tail; t = tail; t = tail; t = tail;Copy the code
The use of the suffix method results in the following:
t ! = tail; t = tail;Copy the code
The effect is equal, but remember: the assignment is performed first, only the assignment is stored in the variable table, and the operand stack already has the old value T, so the comparison is always not equal. If it is an object, it is a reference to the comparison (the object heap address).
Ok, puzzle solved, back to the expression where we judge the branch:
p = (p ! = t && t ! = (t = tail)) ? t : q;Copy the code
In thread three, four due to preemption failure, enter the comparison t! If (t = tail), thread 2 CAS succeeds and the tail pointer is updated, thread 3 and thread 4 will return true!
At this timetrue && true
Results,p = t
As we said earlier, puzzle # 2 here, t has actually been assigned:t=tail
, that is: when the program finds that t is not the latest endpoint (updated after being inserted by another thread), it updates the local variables t and p toThe latest tail pointer
, that is, get the latest tail node, start the same loop as the thread, and try the next CAS node.
What timet ! = (t = tail)
What about returning false?
If thread 2 has successfully executed the CAS but has not yet executed the CAS(tail), threads 3 and 4 will not be able to get the latest tail node and will only advance to the next node.
After thread 3 and thread 4 advance to the next node, thread 4 succeeds in CAS and updates the tail pointer, and thread 2 fails to execute CAS(tail) when it obtains the time slice. However, CAS failures for tail Pointers are allowed.
Finally, the remaining thread three goes back to branch three and findsThe tail pointer
Is updated, execute tail update, skip two nodes in a row. (For n threads, n nodes can be jumped consecutively in extreme cases.)
The difference is that advancing to the next node is a single step, while updating to a tail pointer allows for multi-step optimization.
These might be the onesDoug LeaReferring to thetrick
(Technique).
Get the element poll()
ConcurrentLinkedQueue is a FIFO queue, so it supports nodes coming out of the first poll. The internal implementation is still CAS, but this method supports returning the element E or NULL.
As we can see from the previous section, the add operation is completely targeted atThe tail pointer
; The fetch element is fetched from the head node and is guaranteed to advance backwards, never caringThe tail pointer
, the code is as follows:
Public E poll() {restartFromHead: for (;); For (Node<E> h = head, p= h, q; p = q) { final E item; if ((item = p.item) ! = null && p.casItem(item, Null)) {// Concurrency CAS control entry // Successful CAS is the linearization point // for item to be removed from this queue. = h) // hop two nodes at a time // if p.ext == null, return the current pointer p, // otherwise assignment to q returns update head to p.ext (non-empty node) // if p.ext! UpdateHead (h, ((q = p.ext)! = null) ? q : p); return item; } else if ((q = p.ext) == null) { =null, pushes the pointer to the next node (p=q in the for loop) // points to the next node and determines whether the next node is null // If null, updates the head node directly and returns NULL (indicating that it has reached the tail), which is an auxiliary operation, UpdateHead (h, p); return null; } else if (p == q) // Concurrent processing with other deleted threads, non-exit, // if the node is known to be updated by another thread (unable to fetch the next node), continue restartFromHead (operated in updateHead); }}Copy the code
Let’s assume four threads here; P => pointer; q is the next node of p.
(1) Dummy head node (dummy head node); if the first branch is false, enter branch 2 and proceed to the next node. If the next node is not null, enter branch 3p == q
, re-enter branch one, only thread one has been successfully updated
(2) If thread 1 successfully executes updateHeadhead
The pointer will be skipped to the next node (when the next node is not null) or the current pointer node (when the next node is null) :
updateHead(h, ((q = p.next) ! = null) ? q : p); Final void updateHead(Node<E> h, Node<E> p) {// Update the head pointer and reference the next Node to itself if (h! = p && HEAD.compareAndSet(this, h, p)) NEXT.setRelease(h, h); }Copy the code
The node is successfully updated so that other threads can get the latest location for fetching elements. At the same time, the next of the original head node is referenced to itself, which is an improvement of the algorithm — an optimization operation to support the recycle environment. At the same time, this operation can be used in subsequent iteration methods to avoid outdated iterations of head nodes.
For example, we can start by looking at the method succ, which is used to get the successor node, or if the successor node is itself, it is updated directly to the head pointer:
Final Node<E> succ(Node<E> p) {// proceed to p = p.ext, If (p == (p = p)) p=head; if (p == (p = p)) p=head; return p; }Copy the code
Head is dummy Node, its item is null, and next executes NULL only when initialized and the next Node is null. ③ After the head pointer is updated successfully, item is returned directly.
At this point, the other threads will advance after CAS fails, because thread 1 has referenced next from the original head node to itself, so they will enter branch 3:
else if (p == q)
continue restartFromHead;
Copy the code
The next thread enters the loop again, starting at the new head node, except that the thread head#item is no longer null and can directly compete for CAS execution.
④ If the node is being updatedPointer to the head
Missing the time slice, then other threads scramble and advance to the next node, and execute CAS and try to update the head pointer ifPointer to the head
Update by them, againSkip updates
There are two cases:
- Thread one never gets the time slice, and finally
Pointer to the head
Update by thread 2, 3, and 4 by skip update, and head item is not null, subsequent thread 1 fails to update the header (which is allowed); - Thread one executes after the other thread CAS advances
Pointer to the head
Update, where head is a dummy node and item is null (which is allowed if subsequent threads fail to update the header).
⑤ As the node advances, when the thread concurrent CAS fails and then advances to the last node, that is, threads 3 and 4 are inThe tail pointer
A concurrent CAS occurred at the location. Procedure We assume that threads one, two, and three are ready to execute after CAS(Next) succeedsupdateHead
When thread 4 encounters time slice loss, the situation is as follows:
Again, there are two cases, update the head node or getdummy(item=null)
The head node of
In concurrent cases, the poll operation affects the ADD operation. It is possible that:
- Thread 1 and thread 2 concurrently add elements (Add), thread 3 concurrently acquire elements (poll), thread 1 successfully add, thread 3 successfully obtain and return, and the state diagram is as follows:
This is the branch in the offer method:
Else if (p == q) // Tail does not change after the node is removed (it may block, but it can not prevent the tail from being removed) // At this point, all we need to do is jump the pointer to head so that it can iterate over from the beginning. Otherwise, update tail. // If tail is updated, get the new tail and start the loop, otherwise, From head // We have fallen off list. If tail is unchanged, it // Will also be off list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t ! = (t = tail)) ? t : head;Copy the code
When a thread concurrently performs add and poll operations, the pointer node of the thread operation may be referenced by the poll operation update NEXT. In this case, the optimization techniques are as follows:
- If the pointer determines that the current pointer is equal to the tail pointer, it considers that no other thread has updated the tail pointer. This is not accurate. Poll the updated head pointer and start from the new head.
- If the pointer determines that the current pointer is not equal to the tail pointer, it considers that a new thread has updated the tail pointer. If the tail pointer is in a new state, using the tail pointer is an optimization operation.
Why is it that the tail pointer is not accurate when it is equal and is only new when it is not equal?
This is because the tail pointer is slow to update, and it may even lag behind the head pointer, as in the following case:
Head is the tail of the tail.
Delete element remove(e)
Doug Lea can remove nodes of a specified element in a linked list based on the algorithm’s own support. Delete operations can be executed concurrently with add and poll, which is mainly due to the fact that they are both for item judgments and support for delayed update of head. Take a look at the remove code:
public boolean remove(Object o) { if (o == null) return false; restartFromHead: for (;;) { for (Node<E> p = head, pred = null; p ! = null; ) {// Next Node Node<E> q = p.ext; final E item; If ((item = p.tem)! If (item = p.tem)! = null) { if (o.equals(item) && p.casItem(item, null)) { skipDeadNodes(pred, p, p, q); return true; } // The preceding node is assigned to p; P to advance pred = p; p = q; continue; } for (Node<E> c = p; // if c == p, p is removed and pred is left. // If c == p, p is removed and pred is left. = p, indicating that p has not been removed (c is the initial assignment, unchanged, p advances), then q is assigned to p // or q! = null and Q.tem! = null, indicating the next node is normal, can connect / / connect Mr Pred - > q if (q = = null | | q.i tem! = null) { pred = skipDeadNodes(pred, c, p, q); p = q; break; } // If p == q, that is, reference to itself, then the node is polled, starting from the beginning // if p! If (p == (p = q)) continue restartFromHead; } } return false; }}Copy the code
Common schematic of deletion:
Because of the special structure and characteristics of ConcurrentLinkedQueue itself, as well as the existence of concurrency, there are many deletion cases that need to be considered, including the following:
When there are no other concurrent operations, that is, the advance is a single node advance
- P (=>pointer); / / next item is set to q = p.next; / / CAS sets item to null when item is matched
skipDeadNodes
To relink a node, remove the node from the chain.
private Node<E> skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q) { // assert pred ! = c; // assert p ! = q; // assert c.item == null; // assert p.item == null; You can enter this method if you assert that q is either null, so you can use its predecessor p as its successor, or q! = null && q.item ! If c == p, the node has been polled (or polled). If c! == p, the node is polled (or polled). If (q == null) {// Never unlink trailing node. if (c == p) return pred; q = p; } // if the dummy node is null, the dummy node can be formed. // return (tryCasSuccessor(pred, c, q) && (pred == null || ITEM.get(pred) ! = null)) ? pred : p; }Copy the code
This method mainly helps us to join across nodes and return the front node after the link: maybe the old pred node; If pred is dead, which means pred is no longer in the list, p is returned (it may be a successor node, or it may be the current deleted node).
Pred: deletes the front node of a node. C: deletes the first dead node. P: deletes the last dead nodeCopy the code
Case 1: single-node deletion. When the successor node of the deleted node is null, it indicates that the deleted node is the tail node, that is, the link is no longer needed. C is always p: skipDeadNodes(pred, p, p, q), so pred is returned directly.
Case ② : If the deleted node is not a tail node, i.eq ! = null
, will be linked, at this point:
Pred: front node C =p: deleted node Q: subsequent nodeCopy the code
Succeeding in the call to trycassucceeded (pred, C, Q), we looked at its method implementation:
private boolean tryCasSuccessor(Node<E> pred, Node<E> c, Node<E> p) { // assert p ! = null; // assert c.item == null; // assert c ! = p; Dummy if (pred! = null) return NEXT.compareAndSet(pred, c, p); If (HEAD.compareAndSet(this, c, p)) {next-setrelease (c, c); return true; } return false; }Copy the code
There are two more cases of updating links:
I: If pred is null, it means that the first node with an element is deleted, then we update the first node to p. = null). P has been removed from the item, and the next reference to the original first node has been updated
II: If pred is not null, then the first element is not deleted (or head is dummy node)Next pointer
;
III: After successfully updating the link,skipDeadNode
If pred is null, it indicates that the first node is deleted and there is no pred (still null). If pred is not null, the item is determined to be null. If it is not null, the item is returned (belonging to a normal node), and if it is null, p is returned to indicate that the preceding node is represented (they are in the same situation).
Multiple threads concurrently poll and remove data
In single-threaded or non-concurrent cases, a single element is always deleted and advanced, so in this case, the method is:skipDeadNodes
The parameter c of is always equal to p, because only one node is deleted.
In the case of multi-threaded concurrency,skipDeadNodes
The parameter c represents the first dead node and p represents the last dead node. This is because multiple remove or poll operations can cause multiple nodes to die. The code is remove’s second for loop:
The first loop is simply advancing the nodes and relinking them, for a single node;
The second loop is for concurrent operations. Note that there are only two ways to exit this loop: one is to break out of the second loop and call back to the first loop to perform single-element deletion after successfully pushing and re-linking across nodes; The other is to go back to the method entry based on the condition and start the method again.
- The element is found when the delete thread enters and during a single node advance
item==null
, which means that the element has been preempted and removed by another thread, at which point we save the first encountered node, assign it to C, and try to advance; - 2. Determine whether the next element advancing is NULL. If it is NULL, it means it has reached the tail and exits the loop after jumping over the chain of the dead node.
A null item indicates that another concurrent poll or remove operation has been encountered; If item is not null, it indicates that the dead node in this range has passed and needs to jump the chain.
In the following scenario, three threads execute concurrently: thread 1 remove(3), thread 2 poll, thread 3 remove(2);
- Dummy (head = dummy); dummy (head = dummy); dummy (head = dummy);
- Thread 2 poll, cas(2, NULL) successfully, but it failed to update the head pointer;
- Dummy node (head,q); dummy node (head,q); dummy node (item=3);
- Thread two goes ahead and finds Item! = null, the head pointer is updated, and cas(head,c,q) cannot be used to update the head pointer, and cas fails. The method in this case:
skipDeadNodes
The pointer p, not pred, is returned, but it does not stop it from progressing. Jump out of theskipDeadNodes
After that, the pointer P points to the next node q, q#item=3, jumps back to the first loop, and deletes item=3 successfully.
Here’s the return processing for skipDeadNodes: Due to the existence of concurrent cross-node heavy chain, for the failed thread, it means that other threads have completed the cross-chain operation, so the operation will not be carried out, and the PRED will be updated. Meanwhile, the context relationship with the subsequent P advance (p=q) will still be formed. This is a new start, and the previous problem of dead nodes need not be considered.
private Node<E> skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q) { if (q == null) { // Never unlink trailing node. if (c == p) return pred; q = p; } // If the precursor node is null, that is, the first node was deleted, dummy nodes can be formed, p is returned, indicating that the thread is not needed to succeed in crossing nodes, Succeeded in updating the precursor node to P (the last dead node in the process item= NULL) // Trycasantecedent succeeded, if pred==null, returns directly, means the next delete, Pred =null; // If pred! = null and item! = null, return the last dead node p, so that it can be relinked by the next pointer after the subsequent remove. // This method returns a pre-processed node, either null or plain node. After exiting this method, p=q, / / let outside of Mr Pred and return relations with place before and after the formation of p (tryCasSuccessor (Mr Pred, c, q) && (Mr Pred = = null | | ITEM. Get (Mr Pred)! = null)) ? pred : p;Copy the code
- After thread 1 and thread 3 have finished executing, thread 2 fails to execute the CAS (head) of poll because the head has been updated by thread 3
There are a lot of concurrency issues to consider in the remove operation. Doug Lea’s implementation of the algorithm without reference to the algorithm pseudocode caused me to lose a lot of hair, which makes me want to offer his knee. In addition to the above examples, there are a number of practical considerations for the remove operation, such as:
- Whether the former node is dummy node
- Delete whether the end node
- Poll operation over remove, etc
It looks something like this:
The ConcurrentLinkedQueue deletion algorithm in JDK8 is easier to read than that in JDK15Doug LeaDo some refactor on the algorithm, so there are abstractions and branch complexity.
In addition to the above add, poll, and remove implementations, ConcurrentLinkedQueue provides other concurrent implementations of Iterators (as wellDoug LeaSelf-realization), also has learning significance. But I won’t go over it in this article.
Conclusion:
- ConcurrentLinkedQueue is an improved version of the existing algorithm that supports reclaim and delete operations.
- The operations that can be reclaimed are: associating the next of the original head node with itself;
- ConcurrentLinkedQueue is based on the form of node links, but is not required
The tail pointer
和Pointer to the head
Is the latest state, supports jump update, belongs to the optimization operation; - Cross-node connection can span multiple nodes;
- The addition of ConcurrentLinkedQueue focuses only on nodes
next
CAS update, which ensures concurrency security and efficiency; - In addition, CAS update of item is required to be successful
tail
,head
No successful update is required - There is some trick code, such as t! = (t = tail), p == (p = q) etc
Note: When debugging the ConcurrentLinkedQueue code in IDEA, the IDEA editor will automatically call toString and other methods, which do not conform to the cognitive logic. You need to change the IDEA configuration to avoid this problem.
Reference: Polish notation inverse Polish notation Java bytecode instruction set assembly T! = (t = tail) can be true? ! ConcurrentLinkedQueue () : ConcurrentLinkedQueue () : ConcurrentLinkedQueue ()