This is the sixth day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
preface
This article continues with the internal implementation of the SynchronousQueue fairness policy, without further discussion, directly looking at the source code.
Internal implementation class
The implementation class TransferQueue in the fair policy.
TransferQueue
The implementation class TransferQueue under the fair policy is realized based on Transferer. The fair policy requires first in, first out. Queue also shows its structural characteristics.
There are two states to be aware of:
Cancel operation (interrupted externally or timed out) : item == this;
Queue operation (successfully matched, complementary operation found) : next == this;
A constructor
Initialize the head and tail nodes
TransferQueue() {// Initializes a QNode with a value of null. QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; }Copy the code
QNode
QNode is the implementation of the linked list of queues, in which the variable attribute isData can also be seen that it saves every operation action rather than just the value of joining the queue. The joining operation will be saved as QNode, and the leaving operation will also be saved as QNode, and the variables are updated through CAS operation
Static final class QNode {// next volatile QNode next; // The value of the queue element volatile Object item; // Save the waiting Thread volatile Thread waiter; Final Boolean isData; // whether there isData, the type identifier of the queue element, the data value is true when queuing, the data value is false when queuing. QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } / / update the cas operation next Boolean casNext (QNode CMP, QNode val) {return next = = CMP && UNSAFE.com pareAndSwapObject (this, nextOffset, cmp, val); } / / cas operations update item Boolean casItem (Object CMP, Object val) {return item = = CMP && UNSAFE.com pareAndSwapObject (this, itemOffset, cmp, val); } // Cas operation cancelled, Put the QNode item assignment for the current QNode void tryCancel Object (CMP) {UNSAFE.com pareAndSwapObject (this, itemOffset, CMP, this); } Boolean isCancelled() {return item == this; } Boolean isOffList() {return next == this; // Unsafe mechanics private static final sun.misc.Unsafe; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); }}}Copy the code
variable
Team header and team tail element reference Settings. Note the meaning of the cleanMe node, which will be explained in the specific method operation
/** Head of queue */ / TRANSIENT QNode Head; /** Tail of queue */ / transient QNode Tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted Node * when it was cancelled. */ / Transient Volatile QNode cleanMe; transient volatile QNode cleanMe; transient volatile QNode cleanMe; Private static final sun.misc.Unsafe Unsafe; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("cleanMe")); } catch (Exception e) { throw new Error(e); }}Copy the code
The CAS operation
CAS update variable operation
/** * Tries to cas nh as new head; If successful, unlink * old head's next node to avoid garbage retention. */ / QNode nh) {if (h = = head && UNSAFE.com pareAndSwapObject (this, headOffset, h, nh)) / / next point to update the original head node for yourself and make h to leave state, The isOffList method is true h.ext = h; // forget old next} /** * Tries to cas nt as new tail. QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } /** * Tries to CAS cleanMe slot. */ / QNode val) { return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); }Copy the code
transfer
In the queue and out of the queue operation, unified use a method, namely to achieve the transfer method in the interface to complete, it should be understood that the operation is saved each time
/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; In port used as needed // if e is null, isData is false, in port used is true Boolean isData = (e! = null); for (;;) QNode t = tail; QNode t = tail; QNode h = head; / / head, tail node is empty, uninitialized, loop would spin the if (t = = null | | h = = null) / / saw uninitialized value continue; // spin // if the first and last nodes are the same, the queue is empty. Because if the head node and end node is different in the queue is not possible / / a team and a team directly into the else won't save on the matching in the list if (h = = t | | t.i sData = = isData) {/ / empty or same - mode QNode tn = t.next; // The tail node has been updated by another thread, then loop through if (t! = tail) // inconsistent read continue; // if (tn! Is not empty, another thread has added a node. = null) { // lagging tail advanceTail(t, tn); continue; Timed && nanos <= 0 if timed && nanos <= 0 can't wait return null; If (s == null) s = new QNode(e, isData); // Try to add s to next on the last node. If this fails, loop back if (! t.casNext(null, s)) // failed to link in continue; // Try to update the tail node, the tail node is now s advanceTail(t, s); Object x = awaitFulfill(s, e, timed, nanos) Object x = awaitFulfill(s, e, timed, nanos); If (x == s) {wait was cancelled // Clean (t, s); if (x == s) {wait was cancelled // Clean (t, s); return null; } // Check whether s has been removed from the queue. Normally, the node that matches the dequeue and enqueue must be removed. S.issofflist ()) {// Not already unlinked // advanceHead(t, s); // unlink if head // If (x! = null) // and forget fields s.item = s; // Clear waiting thread s.waiter = null; } return (x ! = null) ? (E)x : e; } else {// complementary mode QNode m = h.next; // Complementary mode QNode m = h.next; Fulfill () execute if (t!) execute if (t! = tail || m == null || h ! = head) continue; // Inconsistent read // Item Object x = M.tem; If (isData == (x! = null) | | / / m already fulfilled instructions for same type x = = m has been other threads operation matching m | | / / m cancelled cancel operation id / / CAS update item value for matching on the operation, such as the current operation is out of the team, M is the joining operation and x is the joining value. In this case, the CAS operation fails. M.casitem (x, e)) {// lost CAS // advanceHead(h, m); // dequeue and retry continue; } // Update the head node advanceHead(h, m); // release m's waiting thread lock so that M operation ends locksupport. unpark(m.waiter); return (x ! = null) ? (E)x : e; }}}Copy the code
awaitFulfill
Called when transferring an operation of the same type, normally (excluding timeouts and interrupts) blocks the thread until a matching operation arrives. For example, if this is the enqueue operation and the last enqueue operation, there may be a need to spin or block waiting for an unqueue operation to wake up the enqueue operation if the timeout is not set.
Object awaitFulfill(QNode s, E E, Boolean timed, Long nanos) {final long deadline = timed? System.nanoTime() + nanos : 0L; Thread w = thread.currentThread (); // Use spins only when head. Next ==s and determine whether or not spins out of time // Non-head. Next doesn't want to spin in more spins on the first time in the linked list, Spins = (head. Next == s)? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) {// If (w.isinterrupted ()) interrupted ()) // If (w.isinterrupted ()) set item of s to s itself, then you know that s is canceled. Object x = s.item; // if there is no matching operation, x will be changed. If there is a matching item, x will be changed. If (x! = e) return x; If (timed) {nanos = deadline-system.nanotime (); if (nanos <= 0L) { s.tryCancel(e); continue; }} // Loop if (spins > 0) --spins; Elseif (s.waiter == null) s.waiter = w; Else if (! timed) LockSupport.park(this); Else if (nanos > spinForTimeoutThreshold) locksupport. parkNanos(this, nanos); }}Copy the code
The clean method
The overall processing process is as follows:
If not, use pred.casnext (s, s.next) to clean up.
If the last node of the team is deleted, if cleanMe is empty, its successor node pred is updated to cleanMe to prepare for the next deletion.
If cleanMe is not empty, delete the nodes that need to be deleted last time according to cleanMe, and then clean cleanMe. If preD is not before cleanMe, then cleanMe is set to PREd to prepare for the next deletion operation.
Void clean(QNode pred, QNode s) {void clean(QNode pred, QNode s) { While (pred.next == s) {Return early if already unlinked QNode h = head; QNode hn = h.next; // Absorb cancelled first node as head // hn if (HN! = null && hn.isCancelled()) { advanceHead(h, hn); continue; } // QNode t = tail; // Ensure consistent read for tail // Empty queue returns if (t == h) return; // QNode tn = t. ext; // The tail node has been updated by another thread. = tail) continue; // Update the tail node if (tn! = null) { advanceTail(t, tn); continue; } // s is not the tail node if (s! = t) {// If not tail, try to unsplice // s QNode sn = s.next; / / update after Mr Pred node to a node for the next node s, equivalent to the s in the list are deleted relationship if (sn = = s | |. Mr Pred casNext (s, sn)) return; QNode dp = cleanMe; if (dp ! = null) {// Try unlinking previous cancelled node // Next QNode d = dp.next; d = dp.next; d = dp.next; d = dp.next; d = dp.next; QNode dn; If (d = = null | | / / remove node is null, the equivalent of has cleared the d = = dp | | / / dp node is in a state of leave! D.i sCancelled () | | be cancelled / / remove node (d! = t && // Clear the node non-tail node (dn = d.ext)! = null && // Clears the non-NULL DN of the next node! CasNext (d, dn))) // Clean the relationship between d and other nodes casCleanMe(dp, null); If (dp == pred) return; if (dp == pred) return; } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }Copy the code