1. Significance of conditional queues

The Condition breaks down the Object monitor methods (wait, notify, and notifyAll) into different objects, allowing each Object to have multiple wait sets by using it in combination with any Lock implementation. Lock replaces the use of synchronized methods and statements, while Condition replaces the use of Object monitor methods.

Conditions (also known as conditional queues or conditional variables) provide a way for one thread to pause execution (” wait “) until another thread notifies that certain status conditions may now be true. Because access to this shared state information occurs in a different thread, it must be secured, so some form of lock is associated with this condition. The key attribute provided by the wait condition is that it automatically releases the associated lock and suspends the current thread, just like object.wait.

Condition instances are essentially bound to locks. To get the Condition instance of a particular Lock instance, use its newCondition() method

2. Conditional queue principle

2.1 Conditional queue structure

A conditional queue is a one-way linked list in which we concatenate the list using the nextWaiter property. However, just as the nextWaiter attribute is not used to concatenate lists in synchronous queues, the prev and next attributes are not used in conditional queues, which are null.

The queue information consists of the following parts:

  1. private transient Node firstWaiter; // The header of the queue
  2. private transient Node lastWaiter; // The tail node of the queue

The node information in the queue consists of the following parts:

  1. Thread of the current node
  2. The status of the current node waitStatus
  3. The next node pointer to the current node is nextWaiter

Structure:

Note:

In a conditional queue, there’s only one value we care about and that’s CONDITION. It means that the thread is in a normal wait state, and as long as waitStatus is not CONDITION, we assume that the thread is no longer waiting and will be dequeued from the conditional queue.

2.2 Principles of team entry

Each creation of a Condtion object corresponds to a Condtion queue, and each thread that calls the await method of the Condtion object is wrapped as Node and thrown into a conditional queue

3. Conditional and synchronous queues

In general, synchronous queues and conditional queues waiting for locks are independent of each other and have nothing to do with each other. However, when we call a condition of the queue signal method, to some or all of the waiting queue of threads in the condition of the awakened, awakened the thread and the common thread that need get to lock, if not to, is also to be added to the waiting for the lock synchronization in the queue, the node will be transferred from the conditions of the queue to synchronous queue

1. Conditional queue to synchronous queue graph

                            Notice the red line on the drawing

However, it is important to note that nodes are passed one by one, even if we call signalAll(), rather than the entire conditional queue at the end of the synchronous queue. Note also that we only use prev and next to concatenate lists in synchronous queues, not nextWaiter; We only use nextWaiter to concatenate lists in conditional queues, not prev, next. In effect, they are two completely separate linked lists that use the same Node data structure. Therefore, when moving a node from a conditional queue to a synchronous queue, we need to break the original link (nextWaiter) and establish a new link (prev, next), which is to some extent one of the reasons for moving nodes one by one.

2. The difference between conditional queues and synchronous queues

The synchronization queue is the queue waiting for the lock. When a thread is wrapped as Node and added to the queue, it must not get the lock. When the node in the queue acquires the lock, it is removed from the queue (in fact, the removal is done by setting the node that acquired the lock to the new dummy head and the Thread attribute to NULL).

A conditional queue is a queue that is waiting under a particular condition, because the thread must have obtained the lock before entering the conditional queue because it must have obtained the lock when calling an await method. After being wrapped as a Node and thrown into a conditional queue, the thread releases the lock and then suspends; When a thread in the queue is awakened by signal, the node in the queue must compete for the lock again because it released the lock when it was suspended. Therefore, the node is added to the synchronization queue. Therefore, the thread does not hold the lock when a conditional queue is enqueued.

3. Conditional queue and synchronous queue lock relationship

Conditional queue: lock already held when queuing -> release lock in queue -> leave queue without lock -> move to synchronous queue

Synchronous queue: No lock on entering queue -> lock contended in queue -> Lock obtained on leaving queue

4

For example, suppose we have a bounded buffer that supports the PUT and take methods. If you try to take on an empty buffer, the thread will block until an item is available. If you try to put on a full buffer, the thread will block until there is space available. We want to continue to wait for the PUT thread and take the thread in a separate wait set so that we can use optimizations that notify individual threads only when items or space in the buffer are available. This can be done using two Condition instances to implement a typical producer-consumer model. Here we create two conditional queues fullCondition, notFullCondition, on the same lock lock. When the queue is full and there is no storage, the put method waits on the notFull condition until the queue is notFull; When the queue is empty and there is no data to read, the take method waits on the notEmpty condition until the queue is notEmpty, and notempty.signal () and notfull.signal () are used to wake up threads waiting on this condition.

public class BoundedQueue {
  /** * producer container */
  private LinkedList<Object> buffer;
  /** * What is the maximum size of the container */
  private int maxSize;
  / * * * * / lock
  private Lock lock;
  /** * full */
  private Condition fullCondition;
  /** * not */
  private Condition notFullCondition;
  BoundedQueue(int maxSize) {
    this.maxSize = maxSize;
    buffer = new LinkedList<Object>();
    lock = new ReentrantLock();
    fullCondition = lock.newCondition();
    notFullCondition = lock.newCondition();
  }
  /** * the producer **@param obj
   * @throws InterruptedException
   */
  public void put(Object obj) throws InterruptedException {
    / / acquiring a lock
    lock.lock();
    try {
      while (maxSize == buffer.size()) {
        System.out.println(Thread.currentThread().getName() + "At this point the queue is full and the added thread enters the wait state.");
        // The queue is full and the added thread enters the wait state
        notFullCondition.await();
      }
      buffer.add(obj);
      / / notice
      fullCondition.signal();
    } finally{ lock.unlock(); }}/** * Consumer **@return
   * @throws InterruptedException
   */
  public Object take(a) throws InterruptedException {
    Object obj;
    lock.lock();
    try {
      while (buffer.size() == 0) {
        System.out.println(Thread.currentThread().getName() + "Now the queue is empty and the thread enters the wait state.");
        // Queue empty thread enters wait state
        fullCondition.await();
      }
      obj = buffer.poll();
      / / notice
      notFullCondition.signal();
    } finally {
      lock.unlock();
    }
    return obj;
  }
  public static void main(String[] args) {
    // Initialize a queue that can hold up to 2 elements
    BoundedQueue boundedQueue = new BoundedQueue(2);
    for (int i = 0; i < 3; i++) {
      Thread thread = new Thread(() -> {
        try {
          boundedQueue.put("Elements");
          System.out.println(Thread.currentThread().getName() + "Produced the element.");
        } catch(InterruptedException e) { e.printStackTrace(); }}); thread.setName("Thread" + i);
      thread.start();
    }
    for (int i = 0; i < 3; i++) {
      Thread thread = new Thread(() -> {
        try {
          boundedQueue.take();
          System.out.println(Thread.currentThread().getName() + "Consume the element");
        } catch(InterruptedException e) { e.printStackTrace(); }}); thread.setName("Thread" + i);
      thread.start();
    }
    try {
      Thread.sleep(3000);
    } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code

Output result:

5. Source code analysis

Methods in the Condition interface

1. await()

Waiting for implementing interruptible conditions, actually we already above case is used to realize the producers of consumer case, go in to see the source code found in fact in the realization of this method is AbstractQueuedSynchronizer ConditionObject Add the node to the synchronization queue and either wake up the thread immediately or wait for the precursor node to release the lock and wake itself up. In either case, where does the awakened thread resume execution? Where the await method is called

InterruptMode This variable logs interrupt events and has three values:

  1. 0: indicates that no interruption occurs during the whole process.
  2. THROW_IE: Indicates that exiting the await() method requires throwing InterruptedException, which corresponds to an interrupt occurring before signal
  3. REINTERRUPT: Indicates that exiting the await() method only needs to interrupt itself below, which corresponds to the interrupt occurring after signal.
public final void await(a) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // Add nodes to the conditional queue
    Node node = addConditionWaiter();
     // Release the lock occupied by the current thread to save the current lock state
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // If the current queue is not in the synchronous queue, the signal method has not been called yet.
    // Suspend the current thread directly
    while(! isOnSyncQueue(node)) { LockSupport.park(this); // Where the thread hangs
         // Thread will be suspended at this point, stop running
        // Either the signal method was called or the thread was interrupted
        // So check to see why the thread is being woken up. If it is woken up because of an interrupt, break out of the while loop
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
   The thread will use the acquireQueued method in the synchronized queue to "block" contention,
   // Get the lock and return. If you don't get the lock, continue to suspend. Therefore, when the await() method returns,
   // The current thread must hold the lock lock
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}
Copy the code

The addConditionWaiter() method encapsulates a node and places it in the conditional queue

private Node addConditionWaiter(a) {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // If the last node is cancelled, the entire list is traversed first to clear all the nodes that were cancelled
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }// Wrap the current thread as Node and throw it into the conditional queue
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // If the current node is null, the newly created node is the first waiting node
    if (t == null)
        firstWaiter = node;
    // If the current node is not null, the newly created node is added to the last node of the current node
    else
        t.nextWaiter = node;
    lastWaiter = node; // The tail node points to the current node
    return node; // Returns the newly added node
}
Copy the code

Note:

  1. When a Node joins a conditional queue, the value of waitStatus is Node.condtion.
  2. If we waiters don’t want to run behind the tail node when we get into the queue we need to call unlinkCancelledWaiters to clear the waiters that have canceled their waits (conditional queue traverses from the head, synchronous queue traverses from the tail)
private void unlinkCancelledWaiters(a) {
    // Get the head node of the queue
    Node t = firstWaiter;
    Node trail = null;
    // The current node is not empty
    while(t ! =null) {
       // Get the next node
        Node next = t.nextWaiter;
        // If the current node is not a conditional node
        if(t.waitStatus ! = Node.CONDITION) {// Cancel the current node in the queue
            t.nextWaiter = null;
            if (trail == null)
                // The head node of the queue is the next node of the current node
                firstWaiter = next;
            else
                // Trail nextWaiter points to the next node of the current node T
                // The t node has already been canceled
                trail.nextWaiter = next;
                // If the next node of t is empty, lastWaiter points to trail
            if (next == null)
                lastWaiter = trail;
        }
        else
            // If trail is a conditional node, it points to the current node
            trail = t;
        // loop assignment traversalt = next; }}Copy the code

The fullyRelease(node) method releases the lock held by the current thread

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // If the release succeeds
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw newIllegalMonitorStateException(); }}finally {
        if (failed)
            // The status of the node is set to cancel and removed from the synchronization queuenode.waitStatus = Node.CANCELLED; }}public final boolean release(int arg) {
    // Try to acquire the lock, if successful, wake up subsequent threads
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

Threads with checkInterruptWhileWaiting method detection after awakening interrupt mode

  1. The thread has not yet been woken up when the first interrupt occurs

It is assumed that have been interrupted, the Thread. Interrupted () method will return true, the signal is to use transferAfterCancelledWait further determine whether happen next.

 // Check for interrupts and return THROW_IE if interrupted before signaling;
 // After signaling, REINTERRUPT is returned; If not interrupted, 0 is returned.
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code

A Node has not been signaled as long as its waitStatus is node. CONDITION. CompareAndSetWaitStatus (Node, node. CONDITION, 0) is executed successfully to set the status of this Node to 0. The enq(node) method is then called to add the current node to the synchronization queue and returns true.

Note: We are not disconnecting node’s nextWaiter at this time, so be sure to disconnect the link at last. TransferAfterCancelledWait calls again, shows that due to transferAfterCancelledWait returns true, checkInterruptWhileWaiting returns THROW_IE now, This means that we should throw the THROW_IE exception when leaving the await method.

   / /...
   while(! isOnSyncQueue(node)) { LockSupport.park(this); // Where the thread hangs
         // Thread will be suspended at this point, stop running
        // Either the signal method was called or the thread was interrupted
        // So check to see why the thread is being woken up. If it is woken up because of an interrupt, break out of the while loop
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
   The thread will use the acquireQueued method in the synchronized queue to "block" contention,
   // Get the lock and return. If you don't get the lock, continue to suspend. Therefore, when the await() method returns,
   // The current thread must hold the lock lock
   
   // We assume here that it has acquired the lock, since we are now
   // interruptMode = THROW_IE, the if statement is skipped.
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// All nodes in the list that are not waiting are removed from the list
    // The unlinkCancelledWaiters method, which we await the first part of the parsing earlier
    // This is a simple way of iterating through the list and finding all waitStatuses
    // nodes that are not CONDITION and remove them from the queue
    if(node.nextWaiter ! =null) // clean up if cancelled
        unlinkCancelledWaiters();
    InterruptMode =THROW_IE
    // reportInterruptAfterWait is called
    if(interruptMode ! =0) reportInterruptAfterWait(interruptMode); }}// When interruptMode=THROW_IE, simply throw an InterruptedException
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
Copy the code

InterruptMode is now THROW_IE, and we will break out of the while loop. AcquireQueued (node, savedState) acquireQueued(node, savedState) acquireQueued(node, savedState)

  1. The thread is woken up from where it was suspended because of an interrupt
  2. Then, we confirmed by transferAfterCancelledWait thread waitStatus values for the Node. The CONDITION, did not signal occurred
  3. We then change the thread’s waitStatus to 0 and add it to the synchronization queue using the ENq (node) method
  4. The thread will then block in the synchronization queue, and will be suspended again if it does not acquire the lock
  5. Once a thread has a lock in the synchronization queue, it removes itself from the conditional queue by calling the unlinkCancelledWaiters method, which also removes other unwaited locks
  6. Finally, we throw InterruptedException with reportInterruptAfterWait

Therefore:

If a thread suspends with an await method, it will not throw InterruptedException immediately after being interrupted. Instead, it will be added to the synchronization queue to contest the lock. If it fails to contest the lock, it will still be suspended.

Once the lock is contested, the thread is removed from the synchronous and conditional queues and throws InterruptedException.

So a thread that calls the await method, even if it is interrupted, will still block until it has acquired the lock and throws InterruptedException when it returns. Interrupt means more than removing it from the conditional queue and adding it to the synchronous queue to contention for locks. In this sense, interrupt and signal are very similar, except that after the await() method is returned, if it is woken up because of an interrupt, The await() method needs to throw InterruptedException indicating that it has been abnormally woken up (normally woken up means woken up by signal).

  1. The interrupt occurs when the thread has already been awoken. The interrupt occurs when the thread is awoken, but the thread has already been signaled
final boolean transferAfterCancelledWait(Node node) {
// The CAS operation will fail when thread A reaches this point
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 
        enq(node);
        return true;
    }
// Since the thread was already signalled before the interrupt occurred, it is only necessary to wait for the thread to successfully enter the synchronization
    while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code

Because transferAfterCancelledWait returns false, the checkInterruptWhileWaiting method returns REINTERRUPT, This means that we only need to interrupt again when we exit the method because the conditional queue has been added to the synchronous queue after signal, so Node.nextwaiter is empty, so we go directly to reportInterruptAfterWait(interruptMode)

    if(node.nextWaiter ! =null) // clean up if cancelled
        unlinkCancelledWaiters();
    InterruptMode =THROW_IE
    // reportInterruptAfterWait is called
    if(interruptMode ! =0) reportInterruptAfterWait(interruptMode); }}// When interruptMode=THROW_IE, simply throw an InterruptedException
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
   // Instead of throwing an interrupt exception, the current thread is interrupted again.
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
Copy the code

Summary of the first case in case two:

  1. The thread is woken up from where it was suspended, and both interrupt and signal occur

  2. Then, we confirmed by transferAfterCancelledWait waitStatus value of the thread is not for the Node. The CONDITION, before the interrupt signal happened

  3. We then spin and wait for the signal method to complete, ensuring that the current node has been successfully added to the synchronization queue

  4. The thread then blocks to acquire the lock in the synchronization queue, and if it does not acquire it, it will be suspended again

  5. Finally, we interrupt the current thread again with reportInterruptAfterWait without throwing InterruptedException

    B. There is no interruption when being woken up, but there is an interruption during the lock snatch

IsOnSyncQueue (node) returns true, is in the sync queue and exits the while loop. AcquireQueued will return true if an interrupt occurs during the lock contention. If an interrupt occurs during the lock contention, the acquireQueued will return true. InterruptMode changes to REINTERRUPT. Next, determine node.nextwaiter! = null, since the node has been removed from the queue when the signal method is called, this condition is also not true. The interruptMode value is REINTERRUPT, indicating that the thread is interrupted after being signaled. The interrupt occurred during a lock grab, too late, so we simply interrupt ourselves again.

Summary of the second case in case two:

  1. The thread is woken up by the signal method without interruption
  2. Because there is no interruption has occurred, we will return from checkInterruptWhileWaiting interruptMode = 0 at this time
  3. Next we return to the while loop, and since the signal method ensures that the node is added to the synchronization queue, the while condition fails and the loop exits
  4. The thread will then block in the synchronization queue, and will be suspended again if it does not acquire the lock
  5. After the thread returns from acquiring the lock, we detect that an interrupt occurred while acquiring the lock and change interruptMode to REINTERRUPT when it has interruptMode=0
  6. Finally, we interrupt the current thread again with reportInterruptAfterWait without throwing InterruptedException

3. In case 3, there is no interruption and normal return

Summary of await methods

  1. To enter await(), you must already hold the lock
  2. Leaving await() must also have already held the lock
  3. Calling await() causes the current thread to be wrapped as Node and thrown into a conditional queue, and then releases the lock it holds
  4. After the lock is released, the current thread is suspended in a conditional queue, waiting for signal or interrupt
  5. When a thread is awakened, it will leave the conditional queue and enter the synchronous queue for locking
  6. If an interrupt occurred before the thread grabbed the lock, the interrupt pattern is recorded depending on whether the interrupt occurred before or after the signal
  7. The thread cleans up after it has grabbed the lock (leaves the conditional queue, handles interrupt exceptions)
  8. The thread already holds the lock and returns from the await() method

In this process we have to pay particular attention to interruptions, as mentioned earlier,Interrupt and signal both remove a thread from a conditional queue and add it to a synchronous queue to compete for a lock. The difference is that signal is considered a normal wake up thread, while interrupt is considered a non-normal wake up threadIf the interrupt occurred before signal, we should throw InterruptedException when we finally return. If the interrupt occurs after the signal, we assume that the thread itself has been properly woken up, that the interrupt is too late, and we ignore it and interrupt ourselves as we await() return, which is equivalent to delaying the interrupt until await() return.

2. awaitUninterruptibly

public final void awaitUninterruptibly(a) {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while(! isOnSyncQueue(node)) { LockSupport.park(this);
        if (Thread.interrupted())
       // If the thread remains in the conditional queue after an interrupt, it will be suspended again
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}
Copy the code

AwaitUninterruptibly () ignores the interrupt all the way. Even if the current thread is awakened because of the interrupt, the method simply logs the interrupt status and suspends again (because there is no action to add it to the synchronization queue) to make the current thread leave the conditional queue to claim the lock. The signal event must occur. Finally, the method does not respond when the thread interrupts in the process of acquiring the lock, but interrupts itself when the lock is finally acquired and returned. As you can see, this method is very similar to the await() method of the “interrupt occurs after signal” REINTERRUPT pattern

Method summary:

  1. Interrupts wake up the thread, but do not cause it to leave the conditional queue. If the thread is woken up only because of the interrupt, it will be suspended again
  2. Only the signal method causes the thread to leave the conditional queue
  3. If an interrupt occurs when or during a call to this method, it interrupts itself only after the method ends and does not throw InterruptedException

3. awaitNanos

This method is almost the same as the await() method, except that the main design idea of this method is that if the timeout is not reached, we will suspend the thread; When the waiting time is exceeded, we move the thread from the conditional queue to the synchronous pair column.

public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}
Copy the code

4. await(long time, TimeUnit unit)

AwaitNanos (Long nanosTimeout) has a new unit of time for the timeout, but the internal implementation still converts the time to nanoseconds for execution. AwaitNanos (Long nanosTimeout) returns the remaining timeout. If this value is greater than 0, it indicates that the timeout has not expired and that the return is caused by signal behavior. And await (long time, TimeUnit unit) of the return value is the transferAfterCancelledWait (node) value, we know that if this method is invoked when the node has not been signal returns true, Node has already been signaled, false is returned. Therefore, when await(long time, TimeUnit Unit) method returns true, signal has occurred before the timeout and the return is caused by the signal method and not by the timeout.

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    boolean timedout = false;
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
    return! timedout; }Copy the code

5. awaitUntil

The awaitUntil(Date Deadline) method is similar to the above methods except that the timeout is an absolute time

public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {if (System.currentTimeMillis() > abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
    return! timedout; }Copy the code

6. signal

Only one node is awakened

public final void signal(a) {
 // getExclusiveOwnerThread() == Thread.currentThread(); The current line
 // Is the thread exclusive
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    // Get the first blocked thread node
    Node first = firstWaiter;
    // Whether the conditional queue is empty
    if(first ! =null)
        doSignal(first);
}
// Traverse the conditional queue, find the first node that has not been cancelled, and add it to the end of the conditional queue
// If there are no nodes in the conditional queue, empty the conditional queue
private void doSignal(Node first) {
    do {
        // point firstWaiter to the next node in the conditional queue head
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // Disconnect the conditional queue head from the conditional queue, then the node becomes an isolated node
        first.nextWaiter = null;
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code

Method summary: Calling signal() takes the first node not cancelled from the current condition queue and adds it to the end of the sync queue.

7. signalAll

Wake up all nodes

public final void signalAll(a) {
 // getExclusiveOwnerThread() == Thread.currentThread(); The current line
 // Is the thread exclusive
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    // Get the first blocked thread node
    Node first = firstWaiter;
   // Whether the conditional queue is empty
    if(first ! =null)
        doSignalAll(first);
}
// Remove and transfer all nodes
private void doSignalAll(Node first) {
    // Clear all data in the queue
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while(first ! =null);
}
// The nodes in the conditional queue are traversed to the synchronous queue one by one
final boolean transferForSignal(Node node) {
  // If the node is already canceled before calling signal, the node is skipped
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
    return false;
 // Use the enq method to add the node to the end of the synchronization queue
    Node p = enq(node); 
    // Returns a precursor node that will hang only after it is set to SIGNAL
    // The current node
    int ws = p.waitStatus;
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code

In the transferForSignal method, we first use the CAS operation to set the waitStatus status of the current node from CONDTION to 0. If the change fails, it indicates that the node has been cancelled, and we directly return to the next node. If the change is successful, we have successfully “awakened” the node from the waiting conditional queue, but the corresponding thread of the node is not really awakened, it has to compete for the lock like other normal threads, so it will be added to the end of the synchronization queue waiting for the lock. Method summary:

  1. Empty the conditional queue (just leave lastWaiter = firstWaiter = NULL and the nodes and connections in the queue still exist)
  2. Remove the head node from the conditional queue and make it an isolated node (nextWaiter,prev,next all null)
  3. If the node is Cancelled, the node is skipped (since it is an isolated node, it will be recycled by GC)
  4. If the node is in a healthy state, it is added to the end of the synchronization queue using the ENQ method
  5. Determine whether to wake up the node (including setting its precursor node to SIGNAL), and wake up the node if necessary
  6. Repeat 2-5 until the entire conditional queue has been processed

6. Summary

The above is the analysis of Condition, the next article will be the analysis of concurrent container class, if there is any mistake, please help to point out and correct in time, thank you, if you like, thank you!