Information about the SynchronousQueue can be obtained from the class comment:

  1. Queues do not store data, so they have no size and cannot be iterated. How do you understand that there’s no size? That is, each time you put a value, you must wait for the corresponding consumer to take the data before you can put it again.
  2. Queue corresponds to peek, contains, clear, isEmpty… And so on are actually ineffective.
  3. A queue consists of two data structures, a last-in, first-out stack and a first-in, first-out queue. The stack is unfair and the queue is fair.

Structure of 1.

SynchronousQueue inheritance, core member variables, and main constructors:

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
	
	Transferer defines the transfer method. Put and take use the same transfer method
	abstract static class Transferer<E>{
	    If e is empty, the special value will be returned directly. If e is not empty, the special value will be passed to the consumer
        abstract E transfer(E e, boolean timed, long nanos);
	}
	
	// Stack implementation, last in first out (unfair)
	static final class TransferStack<E> extends Transferer<E>{... }// Queue implementation, first in, first out (fair)
	static final class TransferQueue<E> extends Transferer<E>{... }/ / tranfer variables
	private transient volatile Transferer<E> transferer;
	
	/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the constructor -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
	// Default is not fair
	public SynchronousQueue(a) {
        this(false);
    }
    
    // Fair use TransferQueue, non fair use TransferStack
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : newTransferStack<E>(); }}Copy the code

It is important to note that SynchronousQueue does not use locks (synchronized and reentrantLock) because the underlying queue cannot match. So SynchronousQueue itself must be thread-safe and schedule threads:

  • Thread-safe through CAS and spin
  • Park and wake up the thread directly. There are two strategies, unfair matching of stack implementation and fair matching of queue

1.1 TransferStack (Unfair =>FILO)

static final class TransferStack<E> extends Transferer<E>{
   
        // The element in the stack, which is a chain stack
        static final class SNode{... }// Stack header pointer
        volatile SNode head;
		
		// SNode has three states:
    	// 1.REQUEST: executes the take method, equivalent to the consumer
        static final int REQUEST    = 0;
        // 2.DATA: performs the put method, equivalent to the producer
        static final int DATA       = 1;
        // 3.FULFILLING: The stack header is blocking waiting for another thread to put or take
        static final int FULFILLING = 2;
    
    	/ /...
}
Copy the code

SNode

SynchronousQueue is a node that does not peek, contains, etc., but has a linked list that holds competing threads and data.

That is, the SynchronousQueue operates by maintaining a linked list. When there is concurrency, the switch is determined by determining the type (mode) of the end node inserted in the linked list.

static final class SNode {
            
            // The current thread
            // Note: This is not set when SNode is created, but when there is no match in the awaitFulfill method that requires sleep
            volatile Thread waiter;
            // Current thread data
            // Note: only put thread item has a value, take thread item=null. If the thread of take wants to fetch data, it can only be mediated by the match pointer
            Object item;
    	    // Node types: REQUEST(0)- Consumer (take), DATA(1)- Producer (PUT), FULFILLING(2)- Switching
            int mode;
			
			volatile SNode next;
            // Very important node. Represents the node that matches with this node. It has two functions:
            // 1. Determine when blocking stack elements can be awakened
            // If thread A blocks while taking because the queue is empty and thread B performs A put operation, thread A's match is set to B, indicating that thread A is ready to wake up
            // 2. Mediate the exchange of data for the take thread
            // If thread A wakes up and wants to return data, thread A can match thread B and get put data. This logic can be seen in the Transfer method below
            volatile SNode match;
------------------------------------------------------------------------------------------------------------------			
			// The constructor passes in item
			Snode snode(snode s, Object e, snode next, int mode)
            SNode(Object item) {
                this.item = item; } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -// Connect val to CMP via CAS
            boolean casNext(SNode cmp, SNode val) {
                return cmp == next &&
                       UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }
			
    		// tryMatch is a very important method with two functions:
            // 1 attempts to assign the parameter node s to the match node of the current thread
            // 2. Wake up the thread of the blocked stack header, and then get the operation s from the match
            // s.tem records the operation node, that is, the data of the operation
            boolean tryMatch(SNode s) {
                if (match == null &&
                    // CAS changes the match to match the node if the current node's match attribute is null
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    if(w ! =null) {    
                        // Set waiter to null
                        waiter = null;
                        // Wake up the current node thread
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                // Returns whether the pairing was successful
                return match == s; 
            }

           // Try to cancel by changing match to yourself.
		   // The method matches itself only if the match is null, that is, if it has already been matched, the method cannot be cancelled
    	   // Generally, after setting timeout and expiration, it is set to cancel
            void tryCancel(a) {
                UNSAFE.compareAndSwapObject(this, matchOffset, null.this);
            }
    
		   // Determine the timeout
    	   // Check if match is itself
            boolean isCancelled(a) {
                return match == this;
            }

           // unsafe related code...
}           
Copy the code

Transfer () : transfer into and out of the stack

Transfer mixes the take and PUT methods together, so the first question is how do you distinguish put from take? Yes Is determined by whether the parameter E is null. If e is not null, it is put; if e is null, it is take. The second question is how do you do thread management, or how do you pair put and take threads? It can be divided into the following three situations:

  • Case 1: There is no data in the queue yet or the current node is of the same type as the top node (as put or as take)
    • Case 1.1: E to be added has a timeout set, and e is about to time out on or off the stack
      • Case 1.1.1: The stack header is not null and the stack header has expired due to timeout, the top of the stack is set to the second node
      • Case 1.1.2: Stack header is empty, null is returned
    • Case 1.2: The timeout for the new element E is not set, or it is set but not timed out,
      1. Construct a new node s with e, make s.ext =head, and then set s to the new stack head
      2. Blocks waiting for node M that matches S
      3. If s has not expired, clean is called to remove s
      4. If so, send S and M off the stack, set a new header, and return
  • Case 2: The current stack contains nodes that are complementary to the given node schema (such as blocking when the top of the stack is PUT and the current node is take)
    • Case 2.1: The stack header has been removed and the next node is set to the stack header
    • Case 2.2: You can mark the current node S as “matching” and set it to head
      1. Take the next node m of S, s and M keep tryMatch
      2. If the match is successful, delete s and M and return item
      3. M and M ext are swapped when the match fails
      4. If there is no match at the end of the stack, exit the spin and enter 3
  • Case 3: When action 2 fails to match (possibly while thread 3 finished matching first), help the node to match and remove (unstack). Then continue (the main loop). This part of the code is basically the same as in action 2, except that the node data is not returned.
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; 
    // Check whether it is put or take: if e is empty, it is take (REQUEST); if e is not empty, it is put (DATA).
    int mode = (e == null)? REQUEST : DATA;// Spin to ensure success
    for (;;) {
        // Take the head node, there are several cases
        SNode h = head;
-----------------------------------------------------------------------------------------------------------------        
        / / case 1: the queue is not data | | the current node with the top node types (same put or take)
        if (h == null || h.mode == mode) {
            // Case 1.1: E to be added has a timeout set, and e to be pushed or removed from the stack will time out
            if (timed && nanos <= 0) {     
                // Case 1.1.1: The stack header is not null && The stack header has timed out
                if(h ! =null && h.isCancelled())
                    casHead(h, h.next); // Discard the stack header and use the element after the stack header as the stack header
                // Case 1.1.2: Stack header is empty
                else
                    return null; // Return null
            // Case 1.2: The timeout for the new element e is not set, or it is set but not timed out
            } else if (casHead(h, s = snode(s, e, h, mode))) { // construct a new node s with e, make s.ext =head, and set s to the new stack head
                // Block waiting for the SNode matching s
                SNode m = awaitFulfill(s, timed, nanos);
                // return m==s to indicate that point S of the current section has timed out
                if (m == s) {   
                    clean(s); // There is no way to remove s directly from the stack, so call clean
                    return null;
                }
                // This can only be done if the value is actually matched
                // stack is not empty &&stack 2 is s (because m is now the top of the new stack)
                if((h = head) ! =null && h.next == s)
                    // set s.ext to head to push s and its pair m off the stack
                    casHead(h, s.next);  
                / / return the item
                // Note: this returns the data of the put node. If the node is taken, the data needs to be obtained through the mediation m
                return(E) ((mode == REQUEST) ? m.item : s.item); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --// Case 2: the current stack contains nodes that are complementary to the given node schema (e.g. block when top of stack is PUT and current node is take)
        } else if(! isFulfilling(h.mode)) {// Case 2.1: The stack header has been canceled
            if (h.isCancelled())            
                casHead(h, h.next); // use the next element as the stack header
            // Case 2.2: You can mark the current node S as "matching" and set it to head
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // Spin until paired
                for (;;) { 
                    // m and s are matching two nodes
                    // Note: m may not be the previous top of the stack, because another node may have entered or the previous top of the stack may have been matched
                    SNode m = s.next;       // m is s's match
                    // the stack is not found
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    // Get the next node of m, because if s and m match successfully, mn will have to fill the position of head
                    // Note: Mn must be the third node in the stack in each cycle, although m may swap with mn later
                    SNode mn = m.next;
                    // Call tryMatch to pair m and s
                    // Note: when tryMatch is called to match S, thread M blocked at awaitFulfill will be awakened. If the match fails, m will return to the blocked state
                    if (m.tryMatch(s)) {
                        // Set head to mn
                        casHead(s, mn);     
                        // Return the data of the put node. If the take node needs to fetch the data through the mediation M
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else  
                        // If the match fails, switch m and M. ext to start the next matchs.casNext(m, mn); }} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --// Thread 3 failed to match
        } else {                             
            SNode m = h.next;  
            // There are no more waiters in the stack, other nodes match m away
            if (m == null)                 
                casHead(h, null);           // pop fulfilling node
            else {
                // If m and h match, mn becomes the new head.
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink}}}}Copy the code

AwaitFulfill () : Waits for a matching node

AwaitFulfill blocks the node waiting for matching, but it does not block immediately. After a certain number of spins, the node will block and wait for the tryMatch after the transfer of other threads

  1. Calculate the time of death deadline and spin count spains
  2. Spin, and each loop determines whether S gets a match
  3. Reach the spin count, park current thread (timing)
  4. When a thread is awakened:
    • TryMatch wake up: Other threads call tryMatch wake up when they traverse the stack in Transfer. If the match is successful, return corresponding m, otherwise it will return to blocking
    • Wake up after timeout: return s (self)
Node awaitFulfill(SNode s, boolean timed, long nanos) {
    // Deadline Time of death. If a timeout period is set, the time of death is equal to the current time + the timeout period, otherwise 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // The current thread
    Thread w = Thread.currentThread();
    // Number of spins, 32 if timeout is set, 512 otherwise
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
                 
    for (;;) {
        // Whether the current thread is interrupted. If the timeout period expires, the current thread will be interrupted
        if (w.isInterrupted())
            s.tryCancel();
		
        // Try to get the match of the current node
        SNode m = s.match;
        // The only exit of this function must match the value
        // return m! == m! The =s table did match
        if(m ! =null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            // Cancel the wait for the current thread
            if (nanos <= 0L) {
                // Call cancel so that m=s
                s.tryCancel();
                continue; }}// If the number of spins is not reached, then the number of spins is decreased by 1
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0; // 
        // If s does not set waiter, set the current thread to waiter
        else if (s.waiter == null)
            s.waiter = w; 
        // If no timeout is set, park the current thread directly
        else if(! timed) LockSupport.park(this); // This is also where unpark wakes up, continuing the loop
        // If timeout is set, call park with nanos timeout
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos); }}Copy the code

Clean () : clears expired nodes

To clear the outdated node S in the stack, the general process is as follows:

  1. Find the next node past of S without cancel
  2. Check whether head is cancelled
  3. Traversal from head joins past
void clean(SNode s) {
            s.item = null;   // Forget item leave item and waiter empty
            s.waiter = null; // forget thread
			
            // Get the next SNode
            SNode past = s.next;
    	   // If past is cancell, then past again
            if(past ! =null && past.isCancelled())
                past = past.next;

            // Clean up from the beginning node
            SNode p;
    	    // Link the head node to the next node, the node cannot be cancelled
            while((p = head) ! =null&& p ! = past && p.isCancelled()) casHead(p, p.next);// Unsplice embedded nodes
            while(p ! =null&& p ! = past) {// The node after the link header is removed cannot also be null.
                SNode n = p.next;
                if(n ! =null && n.isCancelled())
                    p.casNext(n, n.next);
                elsep = n; }}Copy the code

1.2 TransferQueue(Fair =>FIFO)

static final class TransferQueue<E> extends Transferer<E>{
    / / queue head
    transient volatile QNode head;
    / / queue tail
    transient volatile QNode tail;

    // The element of the queue
    static final class QNode {... }/ /...
}
Copy the code

QNode

static final class QNode {

	// The value of the current element. If the current element is blocked, the other thread will set itself to item when it wakes up
    volatile Object item;         // CAS'ed to or from null
    // The current thread that can block
    volatile Thread waiter;       // to control park/unpark
     // true is put, false is take
    final boolean isData;
    
    // The next element of the current element
    volatile QNode next;         
	
	// Pass in data and node types when constructing
	QNode(Object item, boolean isData) {
	    this.item = item;
	    this.isData = isData;
	}
	
	// connect val to CMP via CAS
	boolean casNext(QNode cmp, QNode val) {
	    return next == cmp &&
	        UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
	}
	
	// Change the value of item through cas to val
	boolean casItem(Object cmp, Object val) {
	    return item == cmp &&
	        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
	}
	
	/** * Tries to cancel by CAS'ing ref to this as item. */
	void tryCancel(Object cmp) {
	    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
	}
	
	// The cancelled object is item=this
	boolean isCancelled(a) {
	    return item == this;
	}
	
	// unsafe related code...
} 
Copy the code

Transfer () : transfer(

How does the current thread pass its data to the blocking thread? For the sake of illustration, let’s assume that thread 1 takes data to the queue, gets blocked, becomes blocking thread A, and then thread 2 starts putting data to the queue B. The process looks like this:

  1. Thread 1 gets data from the queue, finds no data in the queue, and blocks as thread A;
  2. Thread 2 puts the data to the end of the queue and finds the first blocked node from the end of the queue, assuming node A is the only node it can find. Thread B then puts the data to the item property of node A and wakes up thread 1.
  3. Thread 1 wakes up and retrieves thread 2’s put data from A. tem. Thread 1 returns successfully.

From this process, we can see that fairness is mainly reflected in that every time data is put, it is put to the end of the queue. Every time data is taken, it is not directly taken from the head of the queue, but from the end of the queue to find the first blocked thread, so that the blocked thread will be released in order.

E transfer(E e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    // True is put, false is get
    booleanisData = (e ! =null);

    for (;;) {
        // Temporary variables at the head and tail of the queue, t=h when the queue is empty
        QNode t = tail;
        QNode h = head;
        // tail and head are not initialized
        // While this continue is CPU intensive, it is not normally encountered because tail and head are already assigned empty nodes when the TransferQueue is initialized
        if (t == null || h == null)
            continue;
-----------------------------------------------------------------------------------------------------------------                        
        / / a: the same end node (empty queue) | | end node and the current node consistent operation (such as a party take block, when the current thread is also take)
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // when t is not tail. That is, tail has been modified, because t and tail must be equal if tail has not been modified
            if(t ! = tail)continue;
            T is not yet the end of the queue. Tn is assigned to t directly, which is a step of verification.
            if(tn ! =null) {
            	// CAS change tail to TN
                advanceTail(t, tn);
                continue;
            }
            // Timeout returns null
            if (timed && nanos <= 0)        // can't wait
                return null;
            // Construct the node node
            if (s == null)
                s = new QNode(e, isData);
            // If you fail to place e at the end of the queue, continue recursively
            if(! t.casNext(null, s))        // failed to link in
                continue;

            advanceTail(t, s);              // swing tail and wait
            AwaitFulfill blocks itself with TransferStack, waiting for the pairing node X
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if(! s.isOffList()) {// not already unlinked
            	// CAS changes head to s
                advanceHead(t, s);  // unlink if head
                if(x ! =null)      // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return(x ! =null)? (E)x : e; -----------------------------------------------------------------------------------------------------------------// The queue is not empty and the current operation is inconsistent with the end of the queue (e.g. the end of the queue is blocked because of take, then the current operation must be put)
        } else {                            // complementary-mode
            // If this is the first execution, m represents tail
            // This line of code is fair to the queue, each operation from the beginning in order
            QNode m = h.next;               // node to fulfill
            if(t ! = tail || m ==null|| h ! = head)continue;                   // inconsistent read

            Object x = m.item;
            if(isData == (x ! =null) | |// m already fulfilled
                x == m ||                   // m cancelled
                // m stands for stack header
                // This assigns the current operation value to the blocked item property of m, so when M is released, the operation value is available! m.casItem(x, e)) {// lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
            // The current operation is placed at the head of the queue
            advanceHead(h, m);              // successfully fulfilled
            // Release the queue head blocking node
            LockSupport.unpark(m.waiter);
            return(x ! =null)? (E)x : e; }}}Copy the code

2. Method parsing & API

SynchronousQueue is simple because it encapsulates two Transfer implementations, TransferStack and TransferQueue.

2.1 Putting: PUT

New elements are placed in the queue until another thread removes them from the queue, terminating on success or interrupting the thread on failure

public void put(E e) throws InterruptedException {
    // if e is empty, throw an exception
    if (e == null) throw new NullPointerException();
    // Call transfer, pass e
    // Wait
    if (transferer.transfer(e, false.0) = =null) {
        Thread.interrupted();
        throw newInterruptedException(); }}Copy the code

2.2 Take out: take

Retrieves data from the queue header and deletes data, returns on success, interrupts the thread on failure

public E take(a) throws InterruptedException {
    // Call transfer and pass in null
    E e = transferer.transfer(null.false.0);
    if(e ! =null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}
Copy the code

2.3 Capacity related methods

The volume-related methods are implemented by default, that is, write-dead.

peek()

public E peek(a) {
        return null;
}
Copy the code

remove()

public boolean remove(Object o) {
        return false;
}
Copy the code

contains()

public boolean contains(Object o) {
        return false;
}
Copy the code