The problem

(1) What is a LinkedTransferQueue?

(2) How does LinkedTransferQueue implement blocking queue?

(3) How does LinkedTransferQueue control concurrency security?

(4) What are the similarities and differences between LinkedTransferQueue and SynchronousQueue?

Introduction to the

The LinkedTransferQueue is a combination of LinkedBlockingQueue, SynchronousQueue (fair mode), and ConcurrentLinkedQueue. It combines the methods of all three and provides a more efficient implementation.

Inheritance system

LinkedTransferQueue implements the TransferQueue interface, which is inherited from BlockingQueue, so LinkedTransferQueue is also a BlockingQueue.

The TransferQueue interface defines the following methods:

// Try handing over elements
boolean tryTransfer(E e);
// Hand over elements
void transfer(E e) throws InterruptedException;
// Try to hand over elements (with timeout)
boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException;
// Determine whether there are consumers
boolean hasWaitingConsumer(a);
// Check the number of consumers
int getWaitingConsumerCount(a);
Copy the code

It defines three methods for handing over elements, including blocking, non-blocking, and timeout.

Storage structure

The LinkedTransferQueue uses a data structure called dual Data Structure, or dual queue.

What does double queuing mean?

The same queue is used for placing and fetching elements. The nodes in the queue have two modes, one is data node and the other is non-data node.

If the head node is a non-data node, match them. If the head node is a data node, generate a data node and put it at the end of the queue (enqueue).

If the head node is a data node, they are matched. If the head node is a non-data node, a non-data node is generated and placed at the end of the queue (enqueue).

Graphically, it looks like this:

Whether you place or take elements, compare them with the head node first. If they have different patterns, match them. If they have the same patterns, join the team.

Source code analysis

The main properties

/ / head node
transient volatile Node head;
/ / end nodes
private transient volatile Node tail;
// There are several ways to put elements:
Return immediately, used in non-timeout poll() and tryTransfer() methods
private static final int NOW   = 0; // for untimed poll, tryTransfer
// Async, does not block, is used to place elements, because the internal use of unbounded single linked list to store elements, does not block the process of place elements
private static final int ASYNC = 1; // for offer, put, add
If no match is found, the call will block until a match is found
private static final int SYNC  = 2; // for transfer, take
// timeout, used in poll() and tryTransfer() methods with timeouts
private static final int TIMED = 3; // for timed poll, tryTransfer
Copy the code

Main inner class

static final class Node {
    // Whether the data node is a producer or a consumer
    final boolean isData;   // false if this is a request node
    // The element value
    volatile Object item;   // initially non-null if isData; CASed to match
    // Next node
    volatile Node next;
    // The thread that holds the element
    volatile Thread waiter; // null until waiting
}
Copy the code

A typical single-linked list structure contains, in addition to storing the value of an element and a pointer to the next node, whether it is a data node and the thread that holds the element.

Internally, isData distinguishes between producers and consumers.

Main construction methods

public LinkedTransferQueue(a) {}public LinkedTransferQueue(Collection<? extends E> c) {
    this(a); addAll(c); }Copy the code

There are only two constructors, and there is no initial capacity, so it is an unbounded blocking queue.

The team

All four methods are the same, calling the xfer() method asynchronously and passing the exact same parameters.

public void put(E e) {
    // Asynchronous mode, no blocking, no timeout
    // Because it is an element, singly linked list storage, will continue to add later
    xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}
Copy the code

Xfer (E E, Boolean haveData, int how, long nanos)

(1) E stands for element;

(2) haveData indicates whether it is a data node,

(3) How refers to the method of placing and retrieving elements, including NOW, ASYNC, SYNC and TIMED.

(4) Nanos stands for timeout time;

Out of the team

The four methods are also direct or indirect calls to the xfer() method, which is slightly different from the timeout rule.

public E remove(a) {
    E x = poll();
    if(x ! =null)
        return x;
    else
        throw new NoSuchElementException();
}
public E take(a) throws InterruptedException {
    // Synchronous mode, blocks until the element is fetched
    E e = xfer(null.false, SYNC, 0);
    if(e ! =null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // There is a timeout
    E e = xfer(null.false, TIMED, unit.toNanos(timeout));
    if(e ! =null| |! Thread.interrupted())return e;
    throw new InterruptedException();
}

public E poll(a) {
    // Return immediately, return null if no element is retrieved
    return xfer(null.false, NOW, 0);
}
Copy the code

The fetch element has its own play, some synchronous, some timeout, and some immediate return.

Methods for handing over elements

public boolean tryTransfer(E e) {
    // Return immediately
    return xfer(e, true, NOW, 0) = =null;
}

public void transfer(E e) throws InterruptedException {
    // Synchronous mode
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw newInterruptedException(); }}public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // There is a timeout
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if(! Thread.interrupted())return false;
    throw new InterruptedException();
}
Copy the code

Notice that the second argument, all true, means that these three methods are also methods for putting elements.

What exactly is the difference between the modes of the xfer() method here? Look at the analysis below.

The magic xfer() method

private E xfer(E e, boolean haveData, int how, long nanos) {
    // Empty elements are not allowed
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed
    // Outer loop, spin, retry on failure
    retry:
    for (;;) {                            // restart on append race

        // The following for loop is used to control the matching process
        // Only one type of node is stored in the queue at a time
        // Start the match from the head node, if the head node is matched first by another line
        // Try the next one until a match is found, or until there are no more elements in the queue
        
        for(Node h = head, p = h; p ! =null;) { // find & match first node
            // The mode of the p node
            boolean isData = p.isData;
            // The value of the p node
            Object item = p.item;
            // p is not matched
            if(item ! = p && (item ! =null) == isData) { // unmatched
                // If the two modes are the same, they do not match, so jump out of the loop and try to join the team
                if (isData == haveData)   // can't match
                    break;
                // If the two patterns are different, try to match
                // Set p to e (null for fetching elements, e for dropping elements)
                if (p.casItem(item, e)) { // match
                    // The match is successful
                    // For is used to control the contention when multiple threads are simultaneously fetching elements
                    // If you don't understand, you can skip it
                    for(Node q = p; q ! = h;) {// The entry may be that the head node has been matched, and then p becomes the next node of h
                        Node n = q.next;  // update by 2 unless singleton
                        // If the head has not changed, update it to the new node
                        // And delete it (forgetNext() will set its next to itself, i.e. remove it from the list)
                        // Why set head to n? Because at this point, the head itself must have been matched
                        P.casitem () is matched by the current element
                        // So we need to de-queue both of them so that other threads can start with the real head without double-checking
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        // If the new head node is empty, its next is empty, or its next does not match, retry
                        if ((h = head)   == null ||
                            (q = h.next) == null| |! q.isMatched())break;        // unless slack < 2
                    }
                    // Wake up the waiting thread in p
                    LockSupport.unpark(p.waiter);
                    // Return the matched element
                    returnLinkedTransferQueue.<E>cast(item); }}// p is already matched or an attempt to match failed
            // The other lines match p first
            // There are two cases: p's next has not been modified, p's next points to itself
            // If p's next already points to itself, take head again and try again; otherwise, take its next and try againNode n = p.next; p = (p ! = n) ? n : (h = head);// Use head if p offlist
        }

        // The node type stored in the queue must be the same as its own
        // Or there are no elements in the queue
        // Join the team (you have to join the team whether you put or take elements)
        // There are four types of team entry:
        // NOW, return immediately. If no match is found, return immediately
        // ASYNC, ASYNC, elements are queued but the current thread does not block (equivalent to unbounded LinkedBlockingQueue)
        After the element is queued, the current thread blocks, waiting to be matched
        // TIMED, an element is allowed to wait for a certain period of time to be matched, and the TIMED element is returned

        // If not immediately returned
        if(how ! = NOW) {// No matches available
            // Create an S node
            if (s == null)
                s = new Node(e, haveData);
            // Try to make the team
            Node pred = tryAppend(s, haveData);
            // Failed to join the queue, try again
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            // If not asynchronous (synchronous or timeout)
            // Wait to be matched
            if(how ! = ASYNC)return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting}}private Node tryAppend(Node s, boolean haveData) {
    // Start with tail and place s at the end of the list
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        // If both start and end are null, there are no elements in the list
        if (p == null && (p = head) == null) {
            // make the first node point to s
            // Notice that the tail pointer does not point to s when the first element is inserted
            if (casHead(null, s))
                return s;                 // initialize
        }
        else if (p.cannotPrecede(haveData))
            // Return null if p cannot be processed
            P and S are of different types, so s is not allowed to join the queue
            // For example, if another program joins a data node first, it is not allowed to join a non-data node.
            // All elements in the queue must be of the same type of node
            // After returning null, the outside method will try again to match re-join etc
            return null;                  // lost race vs opposite mode
        else if((n = p.next) ! =null)    // not last; keep traversing
            // If next of p is not empty, it is not the last node
            // redirect p to the last nodep = p ! = t && t ! = (u = tail) ? (t = u) :// stale tail(p ! = n) ? n :null;      // restart if off list
        else if(! p.casNext(null, s))
            // If CAS fails to update next where s is p
            // There are other lines that are updated to p's next first
            // Let p point to P's next and try again to get S to join the team
            p = p.next;                   // re-read on CAS failure
        else {
            // This means that s has successfully joined the team
            // Update the tail pointer if p is not equal to t
            // Remember that the tail pointer did not point to the new element when the first element was inserted above?
            // Update the tail pointer
            if(p ! = t) {// update if slack now >= 2
                while((tail ! = t || ! casTail(t, s)) && (t = tail) ! =null&& (s = t.next) ! =null && // advance and retry(s = s.next) ! =null&& s ! = t); }// return p, the previous element of s
            returnp; }}}private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    // If there is a timeout, calculate its timeout
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // The current thread
    Thread w = Thread.currentThread();
    // Number of spins
    int spins = -1; // initialized after first item and cancel checks
    // Random number, random let some spin thread give CPU
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        // If the value of the s element is not equal to e, it is matched
        if(item ! = e) {// matched
            // assert item ! = s;
            // Update item of s to s itself
            // set waiter to empty in s
            s.forgetContents();           // avoid garbage
            // Returns the matched element
            return LinkedTransferQueue.<E>cast(item);
        }
        // If the current thread breaks, or a timeout expires
        // Update the element value of s to s itself
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            // Try to disassociate s from its previous node
            // Delete s node
            unsplice(pred, s);
            // Return the value of the element itself, indicating no match
            return e;
        }
        
        If the number of spins is less than 0, count the number of spins
        if (spins < 0) {                  // establish spins at/near front
            // spinsFor() counts the spin count
            // Return 0 if any previous nodes are not matched
            // Return a certain number of times if there is a node in front and it is matching, wait
            if ((spins = spinsFor(pred, s.isData)) > 0)
                // Initializes a random number
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            // The number of spins is reduced by 1
            --spins;
            // Randomly allocate the CPU
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            // Update s waiter to the current thread
            s.waiter = w;                 // request unpark then recheck
        }
        else if (timed) {
            // If there is a timeout, count the timeout and block for a certain time
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            // Not timeout, block, wait to wake up
            // The first if returns the matching element
            LockSupport.park(this); }}}Copy the code

The content of these three methods is particularly complex, a large part of the code is controlling thread safety, various CAS, we briefly describe the general logic here:

(1) An element is coming. We first check whether the node in the queue header has the same mode as this element.

(2) If the patterns are different, try to match them. If the head node is matched first by another line, try to match the next node of the head node, and so on until the match reaches or reaches the end of the list;

(3) If the pattern is the same, or at the end of the list, try to join the team;

(4) When joining the team, it is possible that the tail of the list has been modified, so move the tail pointer back and try to join the team again, and so on;

(5) If the queue is successfully joined, it spins or blocks. If the queue is blocked, it waits for other threads to match and wake up.

(6) The element will be matched in the next loop after awakening, and the matched element will be returned;

(7) There are four cases of joining the team and blocking:

Poll (), tryTransfer(e), ASYNC, ASYNC, poll(), tryTransfer(e), poll(), tryTransfer(e) Add (e), offer(e), PUT (e), offer(e, timeout, unit) c) SYNC Take () and transfer(e) d) TIMED, TIMED, after the element is queued, it waits for a period of time to be matched, and the element itself is returned when the time is not matched. Corresponding methods include poll(timeout, unit) and tryTransfer(e, timeout, unit).Copy the code

conclusion

(1) The LinkedTransferQueue can be viewed as a combination of LinkedBlockingQueue, SynchronousQueue, and ConcurrentLinkedQueue;

(2) The LinkedTransferQueue is implemented using a data structure called a dual queue;

(3) Both the element and the element will join the team;

(4) First try to compare with the head node, if the two modes are different, match them to form CP, and then return the value of each other;

(5) If the two modes are the same, join the queue and spin or block waiting to be awakened;

(6) There are four modes for queue joining and blocking: NOW, ASYNC, SYNC and TIMED;

(7) The LinkedTransferQueue does not use heavy locks such as synchronized and reentrant lock in the whole process, and is basically realized through spin +CAS;

(8) After joining the team, first spin a certain number of times before calling locksupport.park () or locksupport.parknanos block;

eggs

What are the similarities and differences between the LinkedTransferQueue and SynchronousQueue?

(1) In Java8, the two implementations are basically the same, both use dual queues;

(2) The former fully realizes the latter, but is more flexible than the latter;

(3) If there is no matching element, the thread will block;

(4) The former can control whether the thread needs to be blocked by releasing elements. For example, the thread will not be blocked by using four methods of adding elements, but only joining elements, and the thread will be blocked by using Transfer ();

(5) Both elements are basically the same. They will block and wait for new elements to enter and be matched.


Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.