Picture captions

Those who never waste time have no time to complain about their lack of time. – Thomas Jefferson

0 foreword

SynchronousQueue A blocking queue in which each insert must wait for another thread to perform a corresponding delete, and vice versa. The synchronization queue does not have any internal capacity, or even a capacity. You cannot peer into the SynchronousQueue because the element exists only when you try to delete it. You can’t insert elements (using any method) unless another thread tries to remove them; You can’t iterate because there’s nothing to iterate on. The head of the queue is the element that the first queued insertion thread is trying to add to the queue; If there is no such queued thread, there are no elements available for deletion, and poll () returns NULL. SynchronousQueue acts as an empty Collection for the purposes of other Collection methods (for example, contains). This queue does not allow empty elements.

Synchronous queues are similar to the collection channels used in CSP and Ada. They are well suited to switch designs, where an object running in one thread must be synchronized with an object running in another thread in order to pass some information, event, or task to it.

This class supports optional fairness policies for ordering waiting producer and consumer threads. By default, this sort is not guaranteed. However, queues constructed with fairness set to true will grant threads access in FIFO order.

This class and its iterators implement all the optional methods of the Collection and Iterator interfaces.

This class is a member of the Java Collections Framework.

1 Inheritance System

Picture captions

  • Inheriting the AbstractQueue abstract class defines the basic operations on queues
  • Implement the BlockingQueue BlockingQueue interface, whose operations on the queue may throw an exception
  • Implement Searializable interface, which can be serialized

2 Data Structure

Because SynchronousQueue supports both fair and unfair policies, there are two underlying data structures

  • Queues (to implement a fair policy), with a head and a tail
  • Stack (implementing unfair policies), with one header

Queues and stacks are implemented through linked lists. The specific data structure is as follows

Picture captions

Inner class UML diagrams

  • The Transferer is the common class of the TransferStack and TransferQueue. It defines the common operations for transferring data, which are implemented by the TransferStack and TransferQueue

    Picture captions

  • WaitQueue, LifoWaitQueue, and FifoWaitQueue are left over from the serialization strategy for SynchronousQueue in JDK1.5, which is not covered here

    Picture captions

3 Unfair stack (default policy)

3.1 the stack elements

When you put, you put data on the stack. When you take, you take data from the stack, and both operations operate on top of the stack.

  • Volatile SNode Next The next node at the top of the stack

  • Volatile SNode match is used to determine when blocking stack elements can be awakened. For example, if we take first, there is no data in the queue, the take is blocked, and the stack element is SNode1, when we put, It assigns the current put element to SNode1’s match attribute and wakes up take. When take wakes up and SNode1’s match attribute has a value, it gets the put data

  • Volatile Thread waiter Specifies the blocked Thread

  • Object item Undelivered/unconsumed message

3.2 Loading and unloading

  • Push Data onto the stack using methods such as PUT
    Picture captions
  • To remove data from the stack, use methods such as take
    Picture captions

The object of the operation is the top of the stack, and the underlying implementation method is the same:

@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    
    // if e is empty, take method; if e is not empty, put method
    int mode = (e == null)? REQUEST : DATA;/ / spin
    for (;;) {
        // Head node classification
        // 1: indicates that there is no data in the queue
        // 2: non-empty and of type take, indicating that the head node thread is waiting for data
        // 3: non-empty and of type PUT, indicating that the thread of the head node is waiting to release data
        SNode h = head;
        
        // If the stack header is empty, there is no data in the queue.
        // The stack header is not empty and the type of the stack header is the same as this operation
        // If both are put, put the operation in front of the stack header so that the put operation can be executed first
        if (h == null || h.mode == mode) {  // empty or same-mode
            // Set the timeout time, and e on or off the stack will time out,
            // The operation is discarded and null is returned.
            // If the stack header is cancelled at this point, the stack header is discarded and the next node is taken to continue consumption
            if (timed && nanos <= 0) {      // Can't wait
                // The stack header operation was cancelled
                if(h ! =null && h.isCancelled())
                    // Discard the stack header and use the element after the stack header as the stack header
                    casHead(h, h.next);     // Will cancel the node pop stack
                // Return null if the stack header is empty
                else
                    return null;
            // No timeout, use e as the new stack header
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // e waits to be removed from the stack. Either the empty queue takes or the queue puts
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                // The value of s is the stack header, and the value of s is the stack header
                if((h = head) ! =null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        // The stack header is waiting for another thread to put or take
        // If the stack header is blocking and the operation is of type PUT, and the operation happens to be of type take, go here
        } else if(! isFulfilling(h.mode)) {// try to fulfill
            // The stack header has been removed, and the next element is the stack header
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // the third argument to the snode method, h, represents the stack header, which is assigned to the next attribute of S
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    // m is the stack header, just assigned by the snode method above
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                     // tryMatch is a very important method with two functions:
                     // 1 wakes up the blocked stack header m, 2 assigns the current node S to m's match property
                     // When the stack header m is awakened, the operation s can be obtained from m.match
                     // s.tem records the operation node, that is, the data of the operation
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink}}}else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink}}}}Copy the code

Execution process:

  1. Determine whether to put or take
  2. Check whether the stack header data is empty. If yes, go to 3; otherwise, go to 5
  3. Check whether the timeout period is set for the operation. If the timeout period is set and the operation has timed out, return null. Otherwise, go to 4
  4. If the stack head is empty, set the current operation to the stack head, or if the stack head is not empty, but the stack head operation is the same as this operation, set the current operation to the stack head, and see if other threads can satisfy it, if not, block itself. For example, if the current operation is take and there is no data in the queue, it blocks itself
  5. If the stack header is already blocked and needs to be woken up, the current operation can wake up the stack header, if 6, otherwise 4
  6. Treat yourself as a node, assign to the match property of the stack header, and wake up the stack header node
  7. When the stack header is woken up, it gets the match property, which returns the information about the node it woke up.

awaitFulfill

Method of node blocking

/** * rotate/block until node S matches by performing the operation. *@paramS Waiting node *@param timed true if timed wait
 * @paramNanos timeout *@returnMatching node, or s if cancelled */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
	
    // Deadline Time of death. If a timeout period is set, the time of death is equal to the current time + the timeout period, otherwise 0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // Number of spins, 32 if timeout is set, 512 otherwise.
    For example, if this operation is a take operation, there is still no put data of other threads after the number of spins
    If there is a timeout, it will block for a fixed period of time, otherwise it will continue to block
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // Whether the current thread is interrupted. If the timeout period expires, the current thread will be interrupted
        if (w.isInterrupted())
            s.tryCancel();

        SNode m = s.match;
        if(m ! =null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            // Cancel the wait for the current thread
            if (nanos <= 0L) {
                s.tryCancel();
                continue; }}// Select times minus 1
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // Set the current thread to Waiter, mainly for blocking and waking up by the thread
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if(! timed)/ / park obstruction
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos); }}Copy the code

When a node/thread is about to block, it sets its WAITER field and then checks the state at least once more before actually park, thus covering the content-implementer relationship and noting that the waiter is not empty and should therefore be woken up.

When called by a node that appears at the top of the stack at the call point, the parked call is rotated before it to avoid blocking when producers and consumers arrive in time. This may only be enough to happen on multiple processors.

The order of checks returned from the main loop reflects the fact that priority is interrupt > normal return > timeout. (Therefore, in a timeout, one last match check is done before giving up.) Except for calls from the untimed SynchronousQueue. {poll/offer} does not check for interrupts and does not wait at all, so it is trapped in a transfer method instead of calling awaitFulfill.

And it can be found that its blocking strategy is not blocked at first, but after a certain number of spins, there is still no other thread to meet its requirements, it will really block.

3.3 Diagram of unfair model

  • Put1 performs a put(1) operation. Since there are no paired consuming threads, put1 is pushed onto the stack and spins for a short time before sleeping and waiting

  • Put2 then performs the PUT (2) operation again, and puT2 is pushed onto the stack, spins for a while, and then sleeps and waits

  • At this time, a thread take1, execute the take operation, this time found that the top of the stack is puT2 thread, the match is successful, but the implementation will first put the take1 thread into the stack, and then the take1 thread loop matching puT2 thread logic, once found no concurrency conflict, will put the top pointer directly to the put1 thread

  • Finally, another thread, take2, performs the take operation, which is basically the same logic as the previous step. The take2 thread is pushed into the stack, and the puT1 thread is matched in the loop. Finally, all matches are completed, and the stack is empty

It can be seen from the above process that although puT1 program is pushed first, it is followed by matching, which is an unfair strategy.

4 Fair Queue

4.1 Queue Elements

  • Volatile QNode Next The next element of the current element

  • Volatile Object item // CAS’ed to or from null Specifies the value of the current element. If the current element is blocked, the other thread will set itself to the item while waiting for it to wake up

  • Volatile Thread waiter // to control park/unpark block Thread

  • Final Boolean isData True is put, false is take

4.2 transfer

TransferQueue Transfer method of the internal class

E transfer(E e, boolean timed, long nanos) {
    /** ** This basic method is divided into two main cases ** 1. If the queue is empty/the last node in the queue is of the same type as its own, add node * to the queue. The timeout/interrupt/ interrupt method returns node itself * if the match is successful, either null (as returned by producer) will be returned. Or actually pass the value (returned by consumer) * * 2. The queue is not empty, and the head.next node of the queue is the node matched by the current node, * performs data transfer matching, and helps the node of the previous block dequeue */ through the advanceHead method

    QNode s = null; // Construct/reuse as needed
    // true:put false:get
    booleanisData = (e ! =null);

    for (;;) {
        // Temporary variables at the beginning and end of the queue, when the queue is empty, t=h
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null) // See an uninitialized value
            continue;               / / spin
        // The first and last nodes are the same, and the queue is empty
        // The operation of the last node is the same as that of the current node
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // tail is modified. Try again
            if(t ! = tail)continue;
            T is not yet the end of the queue, so we assign tn to t directly
            if(tn ! =null) {
                advanceTail(t, tn);
                / / spin
                continue;
            }
            // Timeout returns null
            if (timed && nanos <= 0)        // Can't wait
                return null;
            // Create a node
            if (s == null)
                s = new QNode(e, isData);
            // If you fail to place s at the end of the queue, continue recursively
            if(! t.casNext(null, s))        // The link failed
                continue;

            advanceTail(t, s);              // Push the tail node and wait
            // Blocks itself until another thread matches it, or it interrupts itself
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s); If s is not the last node in the linked list, CAS will delete the node directly. If s is the last node in the linked list, CAS will either delete the previous cleamMe node (cleamMe! = null), then set s.rev to cleanMe node, delete next time or set s.rev to cleanMe directly
                return null;
            }

            if(! s.isOffList()) {// The link has not been cancelled
                advanceHead(t, s);          // unlink if head advances the head node, then next time the s.ext node is called for matching.
                if(x ! =null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return(x ! =null)? (E)x : e;// The queue is not empty and the current operation is inconsistent with the end of the queue
        // This means that the current operation is the corresponding operation
        // If the end of the queue is blocked by take, the current operation must be put
        } el***plementary-mode
            // If this is the first execution, m represents tail
            // This line of code is fair to the queue, each operation from the beginning in order
            QNode m = h.next;               // node to fulfill
            if(t ! = tail || m ==null|| h ! = head)continue;                   // inconsistent read

            Object x = m.item;
            if(isData == (x ! =null) | |// m already fulfilled
                x == m ||                   // m cancelled
                // m stands for stack header
                // Assign the current operation value to the blocked item property of m
                // when m is released, we can get the value of this operation! m.casItem(x, e)) {// lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }
            // The current operation is placed at the head of the queue
            advanceHead(h, m);              // successfully fulfilled
            // Release the queue head blocking node
            LockSupport.unpark(m.waiter);
            return(x ! =null)? (E)x : e; }}}Copy the code

How the current thread passes its data to the blocking thread after the thread is blocked. Let’s say thread 1 takes data from the queue, blocks, becomes blocking thread A and then thread 2 starts putting data to the queue B, roughly as follows:

  • Thread 1 takes data from the queue, finds no data in the queue, blocks, and becomes THREAD A
  • Thread 2 puts the data to the end of the queue and finds the first blocked node from the end of the queue, assuming node A is the only node it can find. Thread B then puts the data to the item property of node A and wakes up thread 1
  • Thread 1 wakes up and retrieves thread 2’s put data from A. tem. Thread 1 returns successfully.

In this process, fairness is mainly reflected in that every time data is put, it is put to the end of the queue. Each time data is retrieved, it is not directly retrieved from the heap, but from the end of the queue to find the first blocked thread, so that the blocked threads are released in order.

avanceTail

  • Try CAS with nt as the new tail
    Picture captions

4.3 Diagram of fair queue model

In fair mode, the underlying implementation uses the TransferQueue queue, which has a head and tail pointer to the thread node currently waiting for a match.

  • The TransferQueue at initialization

  • The put1 thread performs put(1), and since there are no paired consuming threads, the put1 thread is enqueued and spins for a while while asleep

  • Next, put2 performs put(2), the puT2 thread is enqueued, spins for a while, and then sleeps and waits

  • Put2 is a thread matching the take1 thread, so the take1 thread does not need to queue. The thread to wake up is not puT2, but put1. Since it is now a fair policy, the first to queue is the first to wake up, and puT1 should be woken up first. Summary of fair strategy is: the tail of the team matches the head of the team

  • After execution, the put1 thread is woken up, and the take() method of the take1 thread returns 1(puT1 thread data), thus implementing one-to-one communication between threads

  • Finally, another thread, take2, performs the take operation, at this time only put2 thread waiting, and two threads match, thread put2 is awakened, take2 thread take returns 2(thread put2 data), at this time the queue is back to the starting point

5 concludes

Why can a SynchronousQueue store an element without a container? Because there is no internal container means there is no memory space like an array for multiple elements, but there is a single address memory space for exchanging data. SynchronousQueue, with its unique thread-pairing communication mechanism, may not be used in most common development scenarios, but thread pooling can be used in SynchronousQueue. Because it uses CAS instead of AQS internally, the code can be difficult to understand, but this does not prevent us from understanding the underlying implementation model. On the basis of understanding the model, then read the source code, there will be a sense of direction, it will look easier!