The problem
(1) What is a LinkedTransferQueue?
(2) How does LinkedTransferQueue implement blocking queue?
(3) How does LinkedTransferQueue control concurrency security?
(4) What are the similarities and differences between LinkedTransferQueue and SynchronousQueue?
Introduction to the
The LinkedTransferQueue is a combination of LinkedBlockingQueue, SynchronousQueue (fair mode), and ConcurrentLinkedQueue. It combines the methods of all three and provides a more efficient implementation.
Inheritance system
LinkedTransferQueue implements the TransferQueue interface, which is inherited from BlockingQueue, so LinkedTransferQueue is also a BlockingQueue.
The TransferQueue interface defines the following methods:
// Try handing over elements
boolean tryTransfer(E e);
// Hand over elements
void transfer(E e) throws InterruptedException;
// Try to hand over elements (with timeout)
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// Determine whether there are consumers
boolean hasWaitingConsumer(a);
// Check the number of consumers
int getWaitingConsumerCount(a);
Copy the code
It defines three methods for handing over elements, including blocking, non-blocking, and timeout.
Storage structure
The LinkedTransferQueue uses a data structure called dual Data Structure, or dual queue.
What does double queuing mean?
The same queue is used for placing and fetching elements. The nodes in the queue have two modes, one is data node and the other is non-data node.
If the head node is a non-data node, match them. If the head node is a data node, generate a data node and put it at the end of the queue (enqueue).
If the head node is a data node, they are matched. If the head node is a non-data node, a non-data node is generated and placed at the end of the queue (enqueue).
Graphically, it looks like this:
Whether you place or take elements, compare them with the head node first. If they have different patterns, match them. If they have the same patterns, join the team.
Source code analysis
The main properties
/ / head node
transient volatile Node head;
/ / end nodes
private transient volatile Node tail;
// There are several ways to put elements:
Return immediately, used in non-timeout poll() and tryTransfer() methods
private static final int NOW = 0; // for untimed poll, tryTransfer
// Async, does not block, is used to place elements, because the internal use of unbounded single linked list to store elements, does not block the process of place elements
private static final int ASYNC = 1; // for offer, put, add
If no match is found, the call will block until a match is found
private static final int SYNC = 2; // for transfer, take
// timeout, used in poll() and tryTransfer() methods with timeouts
private static final int TIMED = 3; // for timed poll, tryTransfer
Copy the code
Main inner class
static final class Node {
// Whether the data node is a producer or a consumer
final boolean isData; // false if this is a request node
// The element value
volatile Object item; // initially non-null if isData; CASed to match
// Next node
volatile Node next;
// The thread that holds the element
volatile Thread waiter; // null until waiting
}
Copy the code
A typical single-linked list structure contains, in addition to storing the value of an element and a pointer to the next node, whether it is a data node and the thread that holds the element.
Internally, isData distinguishes between producers and consumers.
Main construction methods
public LinkedTransferQueue(a) {}public LinkedTransferQueue(Collection<? extends E> c) {
this(a); addAll(c); }Copy the code
There are only two constructors, and there is no initial capacity, so it is an unbounded blocking queue.
The team
All four methods are the same, calling the xfer() method asynchronously and passing the exact same parameters.
public void put(E e) {
// Asynchronous mode, no blocking, no timeout
// Because it is an element, singly linked list storage, will continue to add later
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
Copy the code
Xfer (E E, Boolean haveData, int how, long nanos)
(1) E stands for element;
(2) haveData indicates whether it is a data node,
(3) How refers to the method of placing and retrieving elements, including NOW, ASYNC, SYNC and TIMED.
(4) Nanos stands for timeout time;
Out of the team
The four methods are also direct or indirect calls to the xfer() method, which is slightly different from the timeout rule.
public E remove(a) {
E x = poll();
if(x ! =null)
return x;
else
throw new NoSuchElementException();
}
public E take(a) throws InterruptedException {
// Synchronous mode, blocks until the element is fetched
E e = xfer(null.false, SYNC, 0);
if(e ! =null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// There is a timeout
E e = xfer(null.false, TIMED, unit.toNanos(timeout));
if(e ! =null| |! Thread.interrupted())return e;
throw new InterruptedException();
}
public E poll(a) {
// Return immediately, return null if no element is retrieved
return xfer(null.false, NOW, 0);
}
Copy the code
The fetch element has its own play, some synchronous, some timeout, and some immediate return.
Methods for handing over elements
public boolean tryTransfer(E e) {
// Return immediately
return xfer(e, true, NOW, 0) = =null;
}
public void transfer(E e) throws InterruptedException {
// Synchronous mode
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw newInterruptedException(); }}public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// There is a timeout
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if(! Thread.interrupted())return false;
throw new InterruptedException();
}
Copy the code
Notice that the second argument, all true, means that these three methods are also methods for putting elements.
What exactly is the difference between the modes of the xfer() method here? Look at the analysis below.
The magic xfer() method
private E xfer(E e, boolean haveData, int how, long nanos) {
// Empty elements are not allowed
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
// Outer loop, spin, retry on failure
retry:
for (;;) { // restart on append race
// The following for loop is used to control the matching process
// Only one type of node is stored in the queue at a time
// Start the match from the head node, if the head node is matched first by another line
// Try the next one until a match is found, or until there are no more elements in the queue
for(Node h = head, p = h; p ! =null;) { // find & match first node
// The mode of the p node
boolean isData = p.isData;
// The value of the p node
Object item = p.item;
// p is not matched
if(item ! = p && (item ! =null) == isData) { // unmatched
// If the two modes are the same, they do not match, so jump out of the loop and try to join the team
if (isData == haveData) // can't match
break;
// If the two patterns are different, try to match
// Set p to e (null for fetching elements, e for dropping elements)
if (p.casItem(item, e)) { // match
// The match is successful
// For is used to control the contention when multiple threads are simultaneously fetching elements
// If you don't understand, you can skip it
for(Node q = p; q ! = h;) {// The entry may be that the head node has been matched, and then p becomes the next node of h
Node n = q.next; // update by 2 unless singleton
// If the head has not changed, update it to the new node
// And delete it (forgetNext() will set its next to itself, i.e. remove it from the list)
// Why set head to n? Because at this point, the head itself must have been matched
P.casitem () is matched by the current element
// So we need to de-queue both of them so that other threads can start with the real head without double-checking
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
// If the new head node is empty, its next is empty, or its next does not match, retry
if ((h = head) == null ||
(q = h.next) == null| |! q.isMatched())break; // unless slack < 2
}
// Wake up the waiting thread in p
LockSupport.unpark(p.waiter);
// Return the matched element
returnLinkedTransferQueue.<E>cast(item); }}// p is already matched or an attempt to match failed
// The other lines match p first
// There are two cases: p's next has not been modified, p's next points to itself
// If p's next already points to itself, take head again and try again; otherwise, take its next and try againNode n = p.next; p = (p ! = n) ? n : (h = head);// Use head if p offlist
}
// The node type stored in the queue must be the same as its own
// Or there are no elements in the queue
// Join the team (you have to join the team whether you put or take elements)
// There are four types of team entry:
// NOW, return immediately. If no match is found, return immediately
// ASYNC, ASYNC, elements are queued but the current thread does not block (equivalent to unbounded LinkedBlockingQueue)
After the element is queued, the current thread blocks, waiting to be matched
// TIMED, an element is allowed to wait for a certain period of time to be matched, and the TIMED element is returned
// If not immediately returned
if(how ! = NOW) {// No matches available
// Create an S node
if (s == null)
s = new Node(e, haveData);
// Try to make the team
Node pred = tryAppend(s, haveData);
// Failed to join the queue, try again
if (pred == null)
continue retry; // lost race vs opposite mode
// If not asynchronous (synchronous or timeout)
// Wait to be matched
if(how ! = ASYNC)return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting}}private Node tryAppend(Node s, boolean haveData) {
// Start with tail and place s at the end of the list
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// If both start and end are null, there are no elements in the list
if (p == null && (p = head) == null) {
// make the first node point to s
// Notice that the tail pointer does not point to s when the first element is inserted
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
// Return null if p cannot be processed
P and S are of different types, so s is not allowed to join the queue
// For example, if another program joins a data node first, it is not allowed to join a non-data node.
// All elements in the queue must be of the same type of node
// After returning null, the outside method will try again to match re-join etc
return null; // lost race vs opposite mode
else if((n = p.next) ! =null) // not last; keep traversing
// If next of p is not empty, it is not the last node
// redirect p to the last nodep = p ! = t && t ! = (u = tail) ? (t = u) :// stale tail(p ! = n) ? n :null; // restart if off list
else if(! p.casNext(null, s))
// If CAS fails to update next where s is p
// There are other lines that are updated to p's next first
// Let p point to P's next and try again to get S to join the team
p = p.next; // re-read on CAS failure
else {
// This means that s has successfully joined the team
// Update the tail pointer if p is not equal to t
// Remember that the tail pointer did not point to the new element when the first element was inserted above?
// Update the tail pointer
if(p ! = t) {// update if slack now >= 2
while((tail ! = t || ! casTail(t, s)) && (t = tail) ! =null&& (s = t.next) ! =null && // advance and retry(s = s.next) ! =null&& s ! = t); }// return p, the previous element of s
returnp; }}}private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// If there is a timeout, calculate its timeout
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// The current thread
Thread w = Thread.currentThread();
// Number of spins
int spins = -1; // initialized after first item and cancel checks
// Random number, random let some spin thread give CPU
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// If the value of the s element is not equal to e, it is matched
if(item ! = e) {// matched
// assert item ! = s;
// Update item of s to s itself
// set waiter to empty in s
s.forgetContents(); // avoid garbage
// Returns the matched element
return LinkedTransferQueue.<E>cast(item);
}
// If the current thread breaks, or a timeout expires
// Update the element value of s to s itself
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
// Try to disassociate s from its previous node
// Delete s node
unsplice(pred, s);
// Return the value of the element itself, indicating no match
return e;
}
If the number of spins is less than 0, count the number of spins
if (spins < 0) { // establish spins at/near front
// spinsFor() counts the spin count
// Return 0 if any previous nodes are not matched
// Return a certain number of times if there is a node in front and it is matching, wait
if ((spins = spinsFor(pred, s.isData)) > 0)
// Initializes a random number
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
// The number of spins is reduced by 1
--spins;
// Randomly allocate the CPU
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
// Update s waiter to the current thread
s.waiter = w; // request unpark then recheck
}
else if (timed) {
// If there is a timeout, count the timeout and block for a certain time
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// Not timeout, block, wait to wake up
// The first if returns the matching element
LockSupport.park(this); }}}Copy the code
The content of these three methods is particularly complex, a large part of the code is controlling thread safety, various CAS, we briefly describe the general logic here:
(1) An element is coming. We first check whether the node in the queue header has the same mode as this element.
(2) If the patterns are different, try to match them. If the head node is matched first by another line, try to match the next node of the head node, and so on until the match reaches or reaches the end of the list;
(3) If the pattern is the same, or at the end of the list, try to join the team;
(4) When joining the team, it is possible that the tail of the list has been modified, so move the tail pointer back and try to join the team again, and so on;
(5) If the queue is successfully joined, it spins or blocks. If the queue is blocked, it waits for other threads to match and wake up.
(6) The element will be matched in the next loop after awakening, and the matched element will be returned;
(7) There are four cases of joining the team and blocking:
Poll (), tryTransfer(e), ASYNC, ASYNC, poll(), tryTransfer(e), poll(), tryTransfer(e) Add (e), offer(e), PUT (e), offer(e, timeout, unit) c) SYNC Take () and transfer(e) d) TIMED, TIMED, after the element is queued, it waits for a period of time to be matched, and the element itself is returned when the time is not matched. Corresponding methods include poll(timeout, unit) and tryTransfer(e, timeout, unit).Copy the code
conclusion
(1) The LinkedTransferQueue can be viewed as a combination of LinkedBlockingQueue, SynchronousQueue, and ConcurrentLinkedQueue;
(2) The LinkedTransferQueue is implemented using a data structure called a dual queue;
(3) Both the element and the element will join the team;
(4) First try to compare with the head node, if the two modes are different, match them to form CP, and then return the value of each other;
(5) If the two modes are the same, join the queue and spin or block waiting to be awakened;
(6) There are four modes for queue joining and blocking: NOW, ASYNC, SYNC and TIMED;
(7) The LinkedTransferQueue does not use heavy locks such as synchronized and reentrant lock in the whole process, and is basically realized through spin +CAS;
(8) After joining the team, first spin a certain number of times before calling locksupport.park () or locksupport.parknanos block;
eggs
What are the similarities and differences between the LinkedTransferQueue and SynchronousQueue?
(1) In Java8, the two implementations are basically the same, both use dual queues;
(2) The former fully realizes the latter, but is more flexible than the latter;
(3) If there is no matching element, the thread will block;
(4) The former can control whether the thread needs to be blocked by releasing elements. For example, the thread will not be blocked by using four methods of adding elements, but only joining elements, and the thread will be blocked by using Transfer ();
(5) Both elements are basically the same. They will block and wait for new elements to enter and be matched.
Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.