This is the 7th day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021

preface

The SynchronousQueue uses two internal classes to implement a cache free blocking queue for both a fair policy and an unfair policy. Each operation requires a complementary operation. For example, an enqueue must be enqueued. In cases where timeouts and interrupts are not involved, it must wait for another thread to queue and match before it can execute, or it blocks.

TransferStack

Unlike the operation under the fair policy, there is only one state to note:

Cancel operation: match == this;

SNode

The variable of SNode is different from that of QNode. Match can be used to find the matched node after the two operations are matched. The node type mode is also different in use

Static final class SNode {// next points to the next element on the stack; // Volatile SNode match; // Volatile Thread waiter; // Object item; // Node type int mode; SNode(Object item) { this.item = item; } / / CAS update next field Boolean casNext (SNode CMP, SNode val) {return CMP = = next && UNSAFE.com pareAndSwapObject (this, nextOffset, cmp, val); } // Try to match the s node with the current node, Boolean tryMatch(SNode s) {// match == null Indicates that the current node is not matched by other nodes. // CAS updates the match field to s if (match == null &&) UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; // The current node is waiting for the thread to be operated by another thread. Waiters = null; // Waiters need to work at most one unpark // Waiters = null; LockSupport.unpark(w); } return true; } // Check whether the current node matches s. Return match == s; } / / try to cancel the operation will match to this void tryCancel () {UNSAFE.com pareAndSwapObject (this, matchOffset, null, this); } Boolean isCancelled() {return match == this; } // Get the offset of match and next in the object private static final sun.misc. private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); }}}Copy the code

Variable section

Static final int REQUEST = 0; static final int REQUEST = 0; Static final int DATA = 1; static final int DATA = 1; Static final int FULFILLING = 2; static final int FULFILLING = 2; // Volatile SNode head; private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); }}Copy the code

The CAS operation

CAS updates the pointer to the top of the stack, which is relatively simple

boolean casHead(SNode h, SNode nh) {
        return h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }
Copy the code

Determine if the streets match

Check whether the node corresponding to M has been matched, and carry out the bit-and-operation with Fulfillment. check that the stack node corresponding to M is in the fulfillmentstate, that is, it has been matched. By FULFILLING | update mode operation mode, so here lowest to distinguish save data from the request data, to distinguish whether the node is high nodes have a match is found.

/** Returns true if m has fulfilling bit set. */ static boolean isFulfilling(int m) { return (m & FULFILLING) ! = 0; }Copy the code

Snode node

Creates or resets the SNode. If it is empty, a new SNode is created. If it is not empty, the mode and next properties of the node are reset.

    static SNode snode(SNode s, Object e, SNode next, int mode) {
        if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }
Copy the code

transfer

Similar to transferQueue. transfer in and out of queues in fair mode, a unified method is used to achieve the transfer method in the interface. The difference lies in three conditional branches:

If the stack is empty or at the top of the stack, the operation type is the same as the current operation type.

The top of the stack does not match the complementary node (matching the complementary node: it has been matched with other nodes, mode value is 1), and the matching operation is performed.

Help the already matched top of the stack node operation;

@SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; Int mode = (e == null); // constructed/ resused as needed REQUEST : DATA; for (;;) SNode h = head; / / the stack is empty, and/or the top node of the current node for the same operation if (h = = null | | h.m ode = = mode) {/ / set the timeout and overtime time less than or equal to 0 if (timed && nanos < = 0) {if (h! = null && H. scancound ()) // The top of the stack is not empty and the top node of the stack is in the cancelled state. else return null; // Create a node, Else if (casHead(h, s = snode(s, e, h, mode)) {casHead(h, s = snode(s, e, h, mode)) {snode m = awaitFulfill(s, timed, nanos); If (m == s) {// Clean the node, cancel the operation clean(s); return null; } // the top of the stack is updated to the next element of s. // the top of the stack is updated to the third node element. = null && h.next == s) casHead(h, s.next); // Return (E) ((mode == REQUEST)? // Return (E) ((mode == REQUEST)? m.item : s.item); } // Top of stack element does not match complementary node} else if (! IsFulfilling (H.mode)) {// casHead(h, h.ext) will be updated if (H. sCancelled()); // push the newly created node, At the same time FULFILLING | mode with / / s operation mode for 10 or 11 else if (casHead (h, s = snode (s, e, h, FULFILLING | mode))) {/ / enter here show that s already for the top node, And s.ext is its matching node // loop until for (;;) is matched { SNode m = s.next; If (m == null) {casHead(s, null); if (m == null) {casHead(s, null); S = null; s = null; s = null; break; } // SNode mn = m.next; If (m. threadmatch (s)) {if (m. threadmatch (s)) {if (m. threadmatch (s)) {casHead(s, mn); return (E) ((mode == REQUEST) ? m.item : s.item); } else // do not match s.casNext(m, mn); } else {SNode m = h next;} else {SNode m = h next;} else {SNode m = h next; if (m == null) casHead(h, null); else { SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); else h.casNext(m, mn); }}}}Copy the code

Waiting for awakening (awaitFulfill)

With TransferQueue awaitFulfill similar, in the current operation with before operation and at the same time, not set operation time was not at the same time external interrupt thread is blocked waiting for matching nodes wake current blocked thread.

SNode awaitFulfill(SNode S, Boolean timed, long nanos) {final long deadline = timed? System.nanoTime() + nanos : 0L; Thread w = thread.currentThread (); Spins = (shouldSpin(s)); (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) {if (w.isinterrupted ())) // Try setting the match of s to s's own s.cancel (); SNode m = s.match; // if match is not empty, the node is already matched. = null) return m; If (timed) {nanos = deadline-system.nanotime (); if (nanos <= 0L) { s.tryCancel(); continue; Spins if (spins > 0) spins = shouldSpin(s)? (spins-1) : 0; Else if (s.waiter == null) s.waiter = w; Else if (! timed) LockSupport.park(this); Else if (nanos > spinForTimeoutThreshold) locksupport. parkNanos(this, nanos); }}Copy the code

ShouldSpin is needed or not

Determine whether spin operation is required if one of the following conditions is required:

Top node equals s node;

The top node of the stack is empty;

The top node of the stack is matched with other nodes.

    boolean shouldSpin(SNode s) {
        SNode h = head;
        return (h == s || h == null || isFulfilling(h.mode));
    }
Copy the code

Clean operation (CLEAN)

The clean operation cleans the association relation of the stack node S, and also cleans the cancel operation node of the whole stack node. The cleanMe node is much simpler than the TransferQueue. Clean operation

Void clean(SNode s) {// item, waiter empty. s.waiter = null; SNode past = s.ext; // SNode past = s.ext; if (past ! = null && past.isCancelled()) past = past.next; // Update next node to head node SNode p; while ((p = head) ! = null && p ! = past && p.isCancelled()) casHead(p, p.next); // While (p! = null && p ! = past) { SNode n = p.next; if (n ! = null && n.isCancelled()) p.casNext(n, n.next); else p = n; }}Copy the code

conclusion

This is the case with SynchronousQueue’s internal implementation of the unfair policy, noting that the mode part of the state is handled by the high and low levels to distinguish whether a match has been made and what type of operation (producer or consumer) it is. In fact, it is necessary to remember that its operations must be in pairs. In the case of no timeout and no interruption, one thread to perform the queuing operation must need another thread to perform the queuing operation. At this point, the two operations match each other and complete the operation at the same time.