Introduction to the

SynchronousQueue is a synchronous blocking queue in which each insert or fetch waits for a corresponding fetch or insert from another thread. The SynchronousQueue provides two internal policies, fair and unfair. This section examines the default policy, which is not fair.

There is’ REQUEST ‘, ‘DATA’, and ‘FULFILLING’, both of which are ‘consumer’, ‘producer’, and ‘matching’.

When the same mode is used, the same operation refers to either of the REQUEST and DATA types, and the same mode is used, as shown in the figure below:

The same as REQUEST to obtain data on the stack:

Similarly, the DATA operation performed by put is of the DATA type, and the queue status is as follows:

How does it operate in different modes? When a different mode comes in, instead of pushing the current mode to the top of the stack, it pushes the FullFill mode and the current mode to the top of the stack by bit or later. That is, it pushes a mode that makes FullFill requests to the top of the stack and requests pairing operations, as shown below:

As can be seen from the figure above, there was originally a DATA mode waiting for consumers to consume DATA in the stack, and then a REQUEST operation in REQUEST mode was conducted to consume DATA. At this time, the REQUEST mode was not directly pushed to the top of the stack, but converted to FULLFILLING mode, and the original type was retained. This is the request for FULLFILLING. The request is matched with the elements below the top of the stack. When the matching is successful, the top of the stack and the matched elements are removed from the stack at the same time.

TransferStack

The main properties

/** indicates that the current node is the consumption node */
static final int REQUEST    = 0;

/** Indicates that the current node is the production node */
static final int DATA       = 1;

/** The current node is matching consumption */
static final int FULFILLING = 2;

/** Top of stack */
volatile SNode head;
Copy the code

The main method

The method name describe
isFulfilling Check whether the node is a match
casHead The atom updates the head node
snode Create the node node and specify the next node
transfer This method is called either by put or take
awaitFulfill Waiting for the match

Main inner class

static final class SNode {
    volatile SNode next;  // Next node in stack
    volatile SNode match; // The node matched to this
    volatile Thread waiter; // To control park/unpark
    Object item;          // element // data; or null for REQUESTs
    int mode; // Node type
    // Note: item and mode fields don't need to be volatile
    // since they are always written before, and read after,
    // other volatile/atomic operations.

    SNode(Object item) {
        this.item = item;
    }

    boolean casNext(SNode cmp, SNode val) {
        return cmp == next && // Atomically update the next node
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    /**
     * Tries to match node s to this node, if so, waking up thread.
     * Fulfillers call tryMatch to identify their waiters.
     * Waiters block until they have been matched.
     *
     * @param s the node to match
     * @return true if successfully matched to s
     */
    boolean tryMatch(SNode s) {
        if (match == null && // Try to match
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if(w ! =null) {    // waiters need at most one unpark
                waiter = null;
                LockSupport.unpark(w); // The match is successful and the thread is woken up
            }
            return true;
        }
        return match == s;
    }

    /** * Tries to cancel a wait by matching node to itself. */
    /** ** Attempted to cancel */
    void tryCancel(a) {
        UNSAFE.compareAndSwapObject(this, matchOffset, null.this); // Update the match atom to itself
    }

    boolean isCancelled(a) { // If match equals itself, it has been cancelled
        return match == this;
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE; / / magic class
    private static final long matchOffset; // Match offset
    private static final long nextOffset; // next offset

    static {
        try{ UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<? > k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw newError(e); }}}Copy the code

Both put and take operations call the transfer method internally. Let’s focus on analyzing this method.

transfer

E transfer(E e, boolean timed, long nanos) {
    /* * Basic algorithm is to loop trying one of three actions: * * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, * returning it, or null if cancelled. * * 2. If apparently containing node of complementary mode, * try to push a fulfilling node on to stack, match * with corresponding waiting node, pop both from * stack, and return matched item. The matching or * unlinking might not actually be necessary because of * other threads performing action 3: * * 3. If top of stack already holds another fulfilling node, * help it out by doing its match and/or pop * operations, and then continue. The code for helping * is essentially the same as for fulfilling, except * that it doesn't return the item. */

    SNode s = null; // constructed/reused as needed
    int mode = (e == null)? REQUEST : DATA;// If the element is null, it is a consumer; otherwise, it is a producer

    for (;;) { / / spin
        SNode h = head; // set h to the current header
        if (h == null || h.mode == mode) {  // If the header is null or the header has the same mode as the current node
            if (timed && nanos <= 0) {      // If the timeout is set and it has already timed out
                if(h ! =null && h.isCancelled()) // The head node has been canceled
                    casHead(h, h.next);     // Update the next atom of the current head node to the head node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) { // Update the atom of the node s (if s is not already created) to the head node
                SNode m = awaitFulfill(s, timed, nanos); // Suspend and wait to be matched
                if (m == s) {               After returning from the awaitFulfill method, determine whether the node has been canceled
                    clean(s); // The node is cancelled
                    return null;
                }
                if((h = head) ! =null && h.next == s) // The two nodes at the top of the stack need to be queued
                    casHead(h, s.next);     // the head node points to s's next, equivalent to the first two nodes of the queue stack
                * If the top node is the consuming thread, the message thread will wait for the producer thread to match, and the element returned is the producer's element. * The second possibility is: If the top node is the producer thread, the producer thread will wait for the consumer thread to match, and the element returned here is the current s (also producer) element */
                return(E) ((mode == REQUEST) ? m.item : s.item); }}else if(! isFulfilling(h.mode)) {// Whether the head node is being matched
            if (h.isCancelled())            // Whether to cancel the head node
                casHead(h, h.next);         // Atomically update next with head as head (top of stack disqueue)
            // // update the node atom of s (if s is not already created, create s node first, node mode is matching node) to the head node
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // Another spin
                    SNode m = s.next;       // next node of s
                    if (m == null) {        // If null, there are no nodes waiting to match
                        casHead(s, null);   // The header points to null and the stack is empty
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) { // m tries to match s (head points to node)
                        casHead(s, mn);     // The first two nodes of the stack are unqueued
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else     // The second node may have been canceled
                        s.casNext(m, mn);   // Next of head points to next of cancel node (m)}}}else {                            // Head is matching
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // Help m nodes match
                    casHead(h, mn);         // The first two nodes of the stack are unqueued
                else                        // The second node may have been canceled
                    h.casNext(m, mn);       // Next of head points to next of cancel node (m)}}}}Copy the code
  1. If head is null or the head node mode is the same as the current node mode, enqueue is performed
  2. When the modes are different, check whether the HEAD node is KID. If the head node is not KID, match the head node
  3. The head node is matching, so help the head node match

awaitFulfill

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    /* * When a node/thread is about to block, it sets its waiter * field and then rechecks state at least one more time * before actually parking, thus covering race vs * fulfiller noticing that waiter is non-null so should be * woken. * * When invoked by nodes that appear at the point of call * to be at the head of the stack, calls to park are * preceded by spins to avoid blocking when producers and * consumers are arriving very close in time. This can * happen enough to bother only on multiprocessors. * * The order of checks for returning out of main loop * reflects fact that interrupts have precedence over * normal returns, which have precedence over * timeouts. (So, on timeout, one last check for match is * done before giving up.) Except that calls from untimed * SynchronousQueue.{poll/offer} don't check interrupts * and don't wait at all, so are trapped in transfer * method rather than calling awaitFulfill. */
    final long deadline = timed ? System.nanoTime() + nanos : 0L; // If a timeout is set, deadline records how long it will be before the timeout, otherwise 0
    Thread w = Thread.currentThread(); // The current thread
    /** * If the current node is head or head node is null or head node is matching, spin wait a certain number of times (* if timeout is set * If CPU number is less than 2, spin no spin * If CPU number is greater than 2, spin 32 times * If no timeout is set: * Optional if number of cpus is less than 2 * Spins maxTimedSpins * 16 times if more than 2 *) Otherwise not spins wait * * spins If the current node is the head node or the head node is matching, it may soon be the current node's turn to match the team, and the spin consumption is worth it * */
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0); // Number of spins
    for (;;) {
        if (w.isInterrupted()) // If the thread has been interrupted
            s.tryCancel(); // the s node is set to cancel, i.e. the s match points to itself
        SNode m = s.match;
        if(m ! =null) // If any nodes match (possibly due to timeout cancellation or interruption), return
            return m;
        if (timed) { // If timeout is set
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) { // Timeout has occurred
                s.tryCancel(); // Try setting it to cancel
                continue; }}if (spins > 0) // The number of spins is greater than 0
            spins = shouldSpin(s) ? (spins-1) : 0; // If you need more spin, subtract 1; otherwise, 0
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if(! timed) LockSupport.park(this); // Suspend without timeout until the matching node wakes up itself or interrupts wakeup
        else if (nanos > spinForTimeoutThreshold) // If the timeout remaining time is more than 1000 nanoseconds, the suspension is not necessary
            LockSupport.parkNanos(this, nanos); // Suspend over time}}Copy the code

shouldSpin

 boolean shouldSpin(SNode s) {
    SNode h = head;
    return (h == s || h == null || isFulfilling(h.mode));
}
Copy the code

The above method is relatively simple, is to figure out whether the current node or head head node is in match, if established and CPU number greater than 2, the spin wait for a period of time, when there are multiple CPU spin consumption is much lower than the thread hangs, because hung thread need to user mode and kernel mode switch.

clean

void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

    /* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed * it. But we can stop when we see any node known to * follow s. We use s.next unless it too is cancelled, in * which case we try the node one past. We don't check any * further because we don't want to doubly traverse just to * find sentinel. */

    SNode past = s.next; // next node of s
    if(past ! =null && past.isCancelled()) // If the next node of s is also cancelled
        past = past.next; // past points to the next node of s next in order to find an end node


    SNode p;
    while((p = head) ! =null&& p ! = past && p.isCancelled())// If the head node is also cancelled, find an uncancelled one as head
        casHead(p, p.next);

    // Unsplice embedded nodes
    while(p ! =null&& p ! = past) {// Walk from head to past
        SNode n = p.next; // Next node
        if(n ! =null && n.isCancelled()) // If already cancelled
            p.casNext(n, n.next); // point to the next node of the canceled node, which may also be canceled, and continue traversing without changing p
        else
            p = n; // The next node of the head node is normal, traversing the next node}}Copy the code

reference

Juejin. Cn/post / 684490…