Information about the SynchronousQueue can be obtained from the class comment:
- Queues do not store data, so they have no size and cannot be iterated. How do you understand that there’s no size? That is, each time you put a value, you must wait for the corresponding consumer to take the data before you can put it again.
- Queue corresponds to peek, contains, clear, isEmpty… And so on are actually ineffective.
- A queue consists of two data structures, a last-in, first-out stack and a first-in, first-out queue. The stack is unfair and the queue is fair.
Structure of 1.
SynchronousQueue inheritance, core member variables, and main constructors:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
Transferer defines the transfer method. Put and take use the same transfer method
abstract static class Transferer<E>{
If e is empty, the special value will be returned directly. If e is not empty, the special value will be passed to the consumer
abstract E transfer(E e, boolean timed, long nanos);
}
// Stack implementation, last in first out (unfair)
static final class TransferStack<E> extends Transferer<E>{... }// Queue implementation, first in, first out (fair)
static final class TransferQueue<E> extends Transferer<E>{... }/ / tranfer variables
private transient volatile Transferer<E> transferer;
/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the constructor -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
// Default is not fair
public SynchronousQueue(a) {
this(false);
}
// Fair use TransferQueue, non fair use TransferStack
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : newTransferStack<E>(); }}Copy the code
It is important to note that SynchronousQueue does not use locks (synchronized and reentrantLock) because the underlying queue cannot match. So SynchronousQueue itself must be thread-safe and schedule threads:
- Thread-safe through CAS and spin
- Park and wake up the thread directly. There are two strategies, unfair matching of stack implementation and fair matching of queue
1.1 TransferStack (Unfair =>FILO)
static final class TransferStack<E> extends Transferer<E>{
// The element in the stack, which is a chain stack
static final class SNode{... }// Stack header pointer
volatile SNode head;
// SNode has three states:
// 1.REQUEST: executes the take method, equivalent to the consumer
static final int REQUEST = 0;
// 2.DATA: performs the put method, equivalent to the producer
static final int DATA = 1;
// 3.FULFILLING: The stack header is blocking waiting for another thread to put or take
static final int FULFILLING = 2;
/ /...
}
Copy the code
SNode
SynchronousQueue is a node that does not peek, contains, etc., but has a linked list that holds competing threads and data.
That is, the SynchronousQueue operates by maintaining a linked list. When there is concurrency, the switch is determined by determining the type (mode) of the end node inserted in the linked list.
static final class SNode {
// The current thread
// Note: This is not set when SNode is created, but when there is no match in the awaitFulfill method that requires sleep
volatile Thread waiter;
// Current thread data
// Note: only put thread item has a value, take thread item=null. If the thread of take wants to fetch data, it can only be mediated by the match pointer
Object item;
// Node types: REQUEST(0)- Consumer (take), DATA(1)- Producer (PUT), FULFILLING(2)- Switching
int mode;
volatile SNode next;
// Very important node. Represents the node that matches with this node. It has two functions:
// 1. Determine when blocking stack elements can be awakened
// If thread A blocks while taking because the queue is empty and thread B performs A put operation, thread A's match is set to B, indicating that thread A is ready to wake up
// 2. Mediate the exchange of data for the take thread
// If thread A wakes up and wants to return data, thread A can match thread B and get put data. This logic can be seen in the Transfer method below
volatile SNode match;
------------------------------------------------------------------------------------------------------------------
// The constructor passes in item
Snode snode(snode s, Object e, snode next, int mode)
SNode(Object item) {
this.item = item; } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -// Connect val to CMP via CAS
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// tryMatch is a very important method with two functions:
// 1 attempts to assign the parameter node s to the match node of the current thread
// 2. Wake up the thread of the blocked stack header, and then get the operation s from the match
// s.tem records the operation node, that is, the data of the operation
boolean tryMatch(SNode s) {
if (match == null &&
// CAS changes the match to match the node if the current node's match attribute is null
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if(w ! =null) {
// Set waiter to null
waiter = null;
// Wake up the current node thread
LockSupport.unpark(w);
}
return true;
}
// Returns whether the pairing was successful
return match == s;
}
// Try to cancel by changing match to yourself.
// The method matches itself only if the match is null, that is, if it has already been matched, the method cannot be cancelled
// Generally, after setting timeout and expiration, it is set to cancel
void tryCancel(a) {
UNSAFE.compareAndSwapObject(this, matchOffset, null.this);
}
// Determine the timeout
// Check if match is itself
boolean isCancelled(a) {
return match == this;
}
// unsafe related code...
}
Copy the code
Transfer () : transfer into and out of the stack
Transfer mixes the take and PUT methods together, so the first question is how do you distinguish put from take? Yes Is determined by whether the parameter E is null. If e is not null, it is put; if e is null, it is take. The second question is how do you do thread management, or how do you pair put and take threads? It can be divided into the following three situations:
- Case 1: There is no data in the queue yet or the current node is of the same type as the top node (as put or as take)
- Case 1.1: E to be added has a timeout set, and e is about to time out on or off the stack
- Case 1.1.1: The stack header is not null and the stack header has expired due to timeout, the top of the stack is set to the second node
- Case 1.1.2: Stack header is empty, null is returned
- Case 1.2: The timeout for the new element E is not set, or it is set but not timed out,
- Construct a new node s with e, make s.ext =head, and then set s to the new stack head
- Blocks waiting for node M that matches S
- If s has not expired, clean is called to remove s
- If so, send S and M off the stack, set a new header, and return
- Case 1.1: E to be added has a timeout set, and e is about to time out on or off the stack
- Case 2: The current stack contains nodes that are complementary to the given node schema (such as blocking when the top of the stack is PUT and the current node is take)
- Case 2.1: The stack header has been removed and the next node is set to the stack header
- Case 2.2: You can mark the current node S as “matching” and set it to head
- Take the next node m of S, s and M keep tryMatch
- If the match is successful, delete s and M and return item
- M and M ext are swapped when the match fails
- If there is no match at the end of the stack, exit the spin and enter 3
- Case 3: When action 2 fails to match (possibly while thread 3 finished matching first), help the node to match and remove (unstack). Then continue (the main loop). This part of the code is basically the same as in action 2, except that the node data is not returned.
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
// Check whether it is put or take: if e is empty, it is take (REQUEST); if e is not empty, it is put (DATA).
int mode = (e == null)? REQUEST : DATA;// Spin to ensure success
for (;;) {
// Take the head node, there are several cases
SNode h = head;
-----------------------------------------------------------------------------------------------------------------
/ / case 1: the queue is not data | | the current node with the top node types (same put or take)
if (h == null || h.mode == mode) {
// Case 1.1: E to be added has a timeout set, and e to be pushed or removed from the stack will time out
if (timed && nanos <= 0) {
// Case 1.1.1: The stack header is not null && The stack header has timed out
if(h ! =null && h.isCancelled())
casHead(h, h.next); // Discard the stack header and use the element after the stack header as the stack header
// Case 1.1.2: Stack header is empty
else
return null; // Return null
// Case 1.2: The timeout for the new element e is not set, or it is set but not timed out
} else if (casHead(h, s = snode(s, e, h, mode))) { // construct a new node s with e, make s.ext =head, and set s to the new stack head
// Block waiting for the SNode matching s
SNode m = awaitFulfill(s, timed, nanos);
// return m==s to indicate that point S of the current section has timed out
if (m == s) {
clean(s); // There is no way to remove s directly from the stack, so call clean
return null;
}
// This can only be done if the value is actually matched
// stack is not empty &&stack 2 is s (because m is now the top of the new stack)
if((h = head) ! =null && h.next == s)
// set s.ext to head to push s and its pair m off the stack
casHead(h, s.next);
/ / return the item
// Note: this returns the data of the put node. If the node is taken, the data needs to be obtained through the mediation m
return(E) ((mode == REQUEST) ? m.item : s.item); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --// Case 2: the current stack contains nodes that are complementary to the given node schema (e.g. block when top of stack is PUT and current node is take)
} else if(! isFulfilling(h.mode)) {// Case 2.1: The stack header has been canceled
if (h.isCancelled())
casHead(h, h.next); // use the next element as the stack header
// Case 2.2: You can mark the current node S as "matching" and set it to head
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// Spin until paired
for (;;) {
// m and s are matching two nodes
// Note: m may not be the previous top of the stack, because another node may have entered or the previous top of the stack may have been matched
SNode m = s.next; // m is s's match
// the stack is not found
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
// Get the next node of m, because if s and m match successfully, mn will have to fill the position of head
// Note: Mn must be the third node in the stack in each cycle, although m may swap with mn later
SNode mn = m.next;
// Call tryMatch to pair m and s
// Note: when tryMatch is called to match S, thread M blocked at awaitFulfill will be awakened. If the match fails, m will return to the blocked state
if (m.tryMatch(s)) {
// Set head to mn
casHead(s, mn);
// Return the data of the put node. If the take node needs to fetch the data through the mediation M
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// If the match fails, switch m and M. ext to start the next matchs.casNext(m, mn); }} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --// Thread 3 failed to match
} else {
SNode m = h.next;
// There are no more waiters in the stack, other nodes match m away
if (m == null)
casHead(h, null); // pop fulfilling node
else {
// If m and h match, mn becomes the new head.
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink}}}}Copy the code
AwaitFulfill () : Waits for a matching node
AwaitFulfill blocks the node waiting for matching, but it does not block immediately. After a certain number of spins, the node will block and wait for the tryMatch after the transfer of other threads
- Calculate the time of death deadline and spin count spains
- Spin, and each loop determines whether S gets a match
- Reach the spin count, park current thread (timing)
- When a thread is awakened:
- TryMatch wake up: Other threads call tryMatch wake up when they traverse the stack in Transfer. If the match is successful, return corresponding m, otherwise it will return to blocking
- Wake up after timeout: return s (self)
Node awaitFulfill(SNode s, boolean timed, long nanos) {
// Deadline Time of death. If a timeout period is set, the time of death is equal to the current time + the timeout period, otherwise 0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// The current thread
Thread w = Thread.currentThread();
// Number of spins, 32 if timeout is set, 512 otherwise
int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// Whether the current thread is interrupted. If the timeout period expires, the current thread will be interrupted
if (w.isInterrupted())
s.tryCancel();
// Try to get the match of the current node
SNode m = s.match;
// The only exit of this function must match the value
// return m! == m! The =s table did match
if(m ! =null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
// Cancel the wait for the current thread
if (nanos <= 0L) {
// Call cancel so that m=s
s.tryCancel();
continue; }}// If the number of spins is not reached, then the number of spins is decreased by 1
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0; //
// If s does not set waiter, set the current thread to waiter
else if (s.waiter == null)
s.waiter = w;
// If no timeout is set, park the current thread directly
else if(! timed) LockSupport.park(this); // This is also where unpark wakes up, continuing the loop
// If timeout is set, call park with nanos timeout
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos); }}Copy the code
Clean () : clears expired nodes
To clear the outdated node S in the stack, the general process is as follows:
- Find the next node past of S without cancel
- Check whether head is cancelled
- Traversal from head joins past
void clean(SNode s) {
s.item = null; // Forget item leave item and waiter empty
s.waiter = null; // forget thread
// Get the next SNode
SNode past = s.next;
// If past is cancell, then past again
if(past ! =null && past.isCancelled())
past = past.next;
// Clean up from the beginning node
SNode p;
// Link the head node to the next node, the node cannot be cancelled
while((p = head) ! =null&& p ! = past && p.isCancelled()) casHead(p, p.next);// Unsplice embedded nodes
while(p ! =null&& p ! = past) {// The node after the link header is removed cannot also be null.
SNode n = p.next;
if(n ! =null && n.isCancelled())
p.casNext(n, n.next);
elsep = n; }}Copy the code
1.2 TransferQueue(Fair =>FIFO)
static final class TransferQueue<E> extends Transferer<E>{
/ / queue head
transient volatile QNode head;
/ / queue tail
transient volatile QNode tail;
// The element of the queue
static final class QNode {... }/ /...
}
Copy the code
QNode
static final class QNode {
// The value of the current element. If the current element is blocked, the other thread will set itself to item when it wakes up
volatile Object item; // CAS'ed to or from null
// The current thread that can block
volatile Thread waiter; // to control park/unpark
// true is put, false is take
final boolean isData;
// The next element of the current element
volatile QNode next;
// Pass in data and node types when constructing
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// connect val to CMP via CAS
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Change the value of item through cas to val
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/** * Tries to cancel by CAS'ing ref to this as item. */
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// The cancelled object is item=this
boolean isCancelled(a) {
return item == this;
}
// unsafe related code...
}
Copy the code
Transfer () : transfer(
How does the current thread pass its data to the blocking thread? For the sake of illustration, let’s assume that thread 1 takes data to the queue, gets blocked, becomes blocking thread A, and then thread 2 starts putting data to the queue B. The process looks like this:
- Thread 1 gets data from the queue, finds no data in the queue, and blocks as thread A;
- Thread 2 puts the data to the end of the queue and finds the first blocked node from the end of the queue, assuming node A is the only node it can find. Thread B then puts the data to the item property of node A and wakes up thread 1.
- Thread 1 wakes up and retrieves thread 2’s put data from A. tem. Thread 1 returns successfully.
From this process, we can see that fairness is mainly reflected in that every time data is put, it is put to the end of the queue. Every time data is taken, it is not directly taken from the head of the queue, but from the end of the queue to find the first blocked thread, so that the blocked thread will be released in order.
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// True is put, false is get
booleanisData = (e ! =null);
for (;;) {
// Temporary variables at the head and tail of the queue, t=h when the queue is empty
QNode t = tail;
QNode h = head;
// tail and head are not initialized
// While this continue is CPU intensive, it is not normally encountered because tail and head are already assigned empty nodes when the TransferQueue is initialized
if (t == null || h == null)
continue;
-----------------------------------------------------------------------------------------------------------------
/ / a: the same end node (empty queue) | | end node and the current node consistent operation (such as a party take block, when the current thread is also take)
if (h == t || t.isData == isData) {
QNode tn = t.next;
// when t is not tail. That is, tail has been modified, because t and tail must be equal if tail has not been modified
if(t ! = tail)continue;
T is not yet the end of the queue. Tn is assigned to t directly, which is a step of verification.
if(tn ! =null) {
// CAS change tail to TN
advanceTail(t, tn);
continue;
}
// Timeout returns null
if (timed && nanos <= 0) // can't wait
return null;
// Construct the node node
if (s == null)
s = new QNode(e, isData);
// If you fail to place e at the end of the queue, continue recursively
if(! t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
AwaitFulfill blocks itself with TransferStack, waiting for the pairing node X
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if(! s.isOffList()) {// not already unlinked
// CAS changes head to s
advanceHead(t, s); // unlink if head
if(x ! =null) // and forget fields
s.item = s;
s.waiter = null;
}
return(x ! =null)? (E)x : e; -----------------------------------------------------------------------------------------------------------------// The queue is not empty and the current operation is inconsistent with the end of the queue (e.g. the end of the queue is blocked because of take, then the current operation must be put)
} else { // complementary-mode
// If this is the first execution, m represents tail
// This line of code is fair to the queue, each operation from the beginning in order
QNode m = h.next; // node to fulfill
if(t ! = tail || m ==null|| h ! = head)continue; // inconsistent read
Object x = m.item;
if(isData == (x ! =null) | |// m already fulfilled
x == m || // m cancelled
// m stands for stack header
// This assigns the current operation value to the blocked item property of m, so when M is released, the operation value is available! m.casItem(x, e)) {// lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// The current operation is placed at the head of the queue
advanceHead(h, m); // successfully fulfilled
// Release the queue head blocking node
LockSupport.unpark(m.waiter);
return(x ! =null)? (E)x : e; }}}Copy the code
2. Method parsing & API
SynchronousQueue is simple because it encapsulates two Transfer implementations, TransferStack and TransferQueue.
2.1 Putting: PUT
New elements are placed in the queue until another thread removes them from the queue, terminating on success or interrupting the thread on failure
public void put(E e) throws InterruptedException {
// if e is empty, throw an exception
if (e == null) throw new NullPointerException();
// Call transfer, pass e
// Wait
if (transferer.transfer(e, false.0) = =null) {
Thread.interrupted();
throw newInterruptedException(); }}Copy the code
2.2 Take out: take
Retrieves data from the queue header and deletes data, returns on success, interrupts the thread on failure
public E take(a) throws InterruptedException {
// Call transfer and pass in null
E e = transferer.transfer(null.false.0);
if(e ! =null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
Copy the code
2.3 Capacity related methods
The volume-related methods are implemented by default, that is, write-dead.
peek()
public E peek(a) {
return null;
}
Copy the code
remove()
public boolean remove(Object o) {
return false;
}
Copy the code
contains()
public boolean contains(Object o) {
return false;
}
Copy the code