Basic use of Condition

Synchronized can be used to communicate between threads, JUC, and send packets with the same functions.

Condition is a multithreaded coordination communication utility class that provides await/signal methods. It allows some threads to wait together for a Condition (Condition) and the thread will be woken up only if the Condition is met.

1.1 Simple Cases

public class LockConditionDemo {

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread threadA = new Thread(() -> {
            System.out.println("threadA start");
            lock.lock();
            System.out.println("threadA getLock Running");
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.unlock();
            System.out.println("threadA end");
        });

        Thread threadB = new Thread(() -> {
            System.out.println("threadB start");
            lock.lock();
            System.out.println("threadB getLock Running");
            condition.signal();
            lock.unlock();
            System.out.println("threadB end");
        });

        threadA.start();
        TimeUnit.SECONDS.sleep(2); threadB.start(); }}Copy the code

The following output is displayed:

threadA start
threadA getLock Running
threadB start
threadB getLock Running
threadB end
threadA end
Copy the code

1.2 Summary

When the await method is called, the current thread releases the lock and waits, while the other thread calls the condition’s signal or SignalAll method to notify the blocked thread, and then performs unlock to release the lock itself. The awakened thread acquires the previous lock, continues to execute and releases the lock.

So, the two most important methods in condition are await and signal method await: block the current thread and suspend signal: wake up the blocking thread

2 Condition basic introduction

2.1 UML for concrete class diagrams

About the Condition of the implementation class for AbstractQueuedSynchronizer. ConditionObject inner classes.

Let’s take a look at the definition methods provided by the condition interface

public interface Condition {

    void await(a) throws InterruptedException;

    void awaitUninterruptibly(a);

    long awaitNanos(long nanosTimeout) throws InterruptedException;

    boolean await(long time, TimeUnit unit) throws InterruptedException;

    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal(a);

    void signalAll(a);
}
Copy the code

ConditionObject’s key data structure

private transient Node fristWaiter;

private transient Node lastWaiter;
Copy the code

Each ConditionObject maintains its own conditional wait queue.

2.2 waiting for

Before analyzing the source code, let’s talk about wait queues.

The wait queue is a FIFO queue. Each node in the queue contains a thread reference, which is the thread waiting on the Condition object. If a thread calls the condition.await () method, The thread then releases the lock, constructs the node to join the wait queue, and enters the wait state. A Condtion contains a wait queue, and a Condtion has a first node (fristWaiter) and a last node (llastWaiter). The current thread calls the condition.await () method, which will construct the node from the current thread and queue the node from the tail. The basic structure of the wait queue is shown below:

A Condition contains a wait queue, and the Condition has a first node (firstWaiter) and a last node (lastWaiter). The current thread calls the condition.await () method, which will construct the node from the current thread and queue the node from the tail.

2.2.1 await

Calling the condition.await() method queues the current thread and releases the lock while the thread changes to the wait state. When returned from the await() method, the lock associated with the condition must be obtained.

When the condition.await() method is called, the head node of the synchronization queue (the node that acquired the lock) is moved to the condition’s wait queue.

The thread that calls this method has successfully acquired the lock thread, that is, the first node in the synchronization queue. This method constructs the current thread as a node and joins the wait queue, then releases the synchronization state and wakes up the successor nodes in the synchronization queue, and then the current thread enters the wait state. When a node in the wait queue is awakened, the thread that awakened the node attempts to obtain synchronization status. If the waiting thread is interrupted instead of being awakened by another thread calling the conditional.signal () method, InterruptedException is thrown. If the current thread joins the Condition from the queue point of view, the process is shown below. The first node of the synchronous queue does not join the Condition directly, but instead constructs the current thread as a new node and adds it to the queue via addConditionWaiter().

public final void await(a) throws InterruptedException {
    // Detects the interrupt flag of the current thread and throws an exception if the interrupt bit is 1
    if (Thread.interrupted())
        throw new InterruptedException();
    // Add a waiting node. AddConditionWaiter is a simple list maintenance node operation
    Node node = addConditionWaiter();
    // Release the possessed lock and get the state of the current lock. And wakes up a thread in the synchronization queue
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // Whether the current node is in the synchronization queue. If the current node is in the synchronization blocking queue, apply for a lock.
    // Synchronous queue situation - concurrency: A thread await executes here to release lock. After thread B singal (waiting for the node to join the synchronization node),
    // Execute the following code. AcquireQueued may fail (park again) or succeed (B release lock was executed before this)
    // If not in sync queue (in wait queue), block, wait condition satisfied
    while(! isOnSyncQueue(node)) {// Park is blocked until the condition is satisfied
        LockSupport.park(this);
        When a thread is awakened from a conditional wait, it is removed from the conditional queue and placed in the synchronous wait queue.
        // The thread is cancelled (interrupted).
        // If it is interrupted, it needs to handle the interrupt according to different modes. There are also two ways to handle interrupts: 1. Continue to set interrupt bits. 2: Throws InterruptedException directly
        // 1, singal, interrupt, interrupt
        // interrupt singal
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
          	// There is no interrupt due to joining from the wait queue to the synchronization queue. The next loop will also jump out of the while
            break;
    }
    // Singal wakes up and tries to apply the lock
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// Process the unwait node and remove it from the wait queue
    if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    // Interrupt processing is performed according to the interrupt identifier
  	// If THROW_IE is used, an interrupt exception is thrown
    // If it is REINTERRUPT, the interrupt is responded again
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}
Copy the code

The above is the overall method of await, and the general process is explained. Each method is then parsed.

2.2.1.1 addConditionWaiter

Add the conditional wait node

private Node addConditionWaiter(a) {
    Node t = lastWaiter;
    // If the state of the last waiting Node is not Node.CONDITION, the Node in the waiting queue whose Node state is not Node.CONDITION will be deleted first.
    // May be interrupted or await await expiration
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// Remove the node from the wait queue
        unlinkCancelledWaiters();
        // Clears nodes that are not node. CONDITION, and the endpoints may change accordingly
        t = lastWaiter;
    }
    // The current thread constructs the node and joins the wait queue
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // Construct the first node
    if (t == null)
        firstWaiter = node;
    // Construct the current waiting thread as a node and add it to the end node, as shown in the previous structure diagram
    else
        t.nextWaiter = node;
    // Change the tail
    lastWaiter = node;
    return node;
}
Copy the code

2.2.1.2 unlinkCancelledWaiters

When adding a node to the LastWaiter in the wait queue, the LastWaiter is not in CONDITION state, and the unwaited node is removed from the wait queue from the beginning.

private void unlinkCancelledWaiters(a) {
    / / the first node
    Node t = firstWaiter;
    // iterate backwards from the start Node except for node. CONDITION
    // If the current node is not removed, the trail temporary variable is assigned to form a linked list with the nodes that will not be cancelled next time
    Node trail = null;
    while(t ! =null) {
        Node next = t.nextWaiter;
        // Remove the Node that is not node. CONDITION from the queue
        if(t.waitStatus ! = Node.CONDITION) { t.nextWaiter =null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        elsetrail = t; t = next; }}Copy the code

2.2.1.3 fullyRelease

If the current lock has multiple reentrants, then only one release is required to return all reentrants to zero.

Releasing the lock wakes the next node of the Head node in the synchronization queue to acquire the lock. The synchronization queue also changes

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw newIllegalMonitorStateException(); }}finally {
        if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code

2.2.1.4 isOnSyncQueue

If not, the current node is not waking up to claim the lock, so the current thread needs to be blocked until another thread calls signal to wake up

If it is in a synchronization queue, it means that it needs to compete for synchronization locks to obtain execution permission for the program.

Nodes in the waiting queue are rejoined to the synchronization queue to compete for the lock. When signal is called, the current node is moved from the wait queue to the synchronous queue

final boolean isOnSyncQueue(Node node) {
    CONDITION or node.prev == null indicates that the Node is in the wait queue and is not added to the synchronization queue.
    // One is determined by waitStatus and the other is determined by the data structure of the queue (synchronous pairs are bidirectional lists, wait queues are unidirectional lists).
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // Queue synchronization has pre and next(bidirectional list)
    // The queue is nextWaiter.
    if(node.next ! =null) 
        return true;
    // Node. prev is not empty, but it is not in the synchronization queue either, because the CAS may fail (the synchronization queue links Pre node first, and CAS next node).
    // To avoid signal loss, select the node from the synchronization queue again, returning true if found, false otherwise
    return findNodeFromTail(node);
}
Copy the code
// Walk forward from the end node to find the node
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false; t = t.prev; }}Copy the code

2.2.1.5 checkInterruptWhileWaiting

The thread was blocked earlier while parsing the await method. Wake up with signal and then go back to the logic that was executed last time.

Check if the thread is interrupted when await is awakened.

If the current thread is interrupted, then call transferAfterCancelledWait method judging subsequent processing should be throw InterruptedException or interrupted again.

  • Singal interrupt and then interrupt again
  • Interrupt, then singal, throws an exception directly
private int checkInterruptWhileWaiting(Node node) {
    // Whether the thread is interrupted, the interrupt flag bit is cleared. Determine whether to interrupt again
    return Thread.interrupted() ?
        // Check whether the current node is added to the synchronization queue
        // return false if the queue has been added to the synchronization queue.
        // select * from singal; // select * from singal;
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
Copy the code

2.2.1.6 transferAfterCancelledWait

final boolean transferAfterCancelledWait(Node node) {
    // CAS succeeded. The current node is not added from the wait queue to the synchronization queue
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // Remove the current node from the wait queue and add it to the synchronization queue
        enq(node);
        return true;
    }
    // If cas fails, the node checks whether the current node is already on the synchronization queue. If not, another thread executes
		// When node triggers signal, node is added to the sync queue
  	// Loop to check whether node has been successfully added to the synchronization queue. If not, yield the CPU scheduling for the thread
    while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code

2.2.1.7 reportInterruptAfterWait

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    // Throw an exception directly
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    // Re-interrupt the thread itself
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

static void selfInterrupt(a) {
     Thread.currentThread().interrupt();
}
Copy the code

2.3 notice

Calling the signal() method on Condition will wake up the longest waiting node in the wait queue (firstWaiter) and move the node to the synchronization queue before waking it up.

2.3.1 signal

public final void signal(a) {
    // If the current thread is not the lock holder, throw an exception
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
  	// Notify the first wait in the wait queue
    if(first ! =null)
        doSignal(first);
}
Copy the code

2.3.2 doSignal

The main function of this method is to remove the notified node from the wait queue and add the notified node to the synchronization queue.

private void doSignal(Node first) {
    do {
        // First set the next node to be notified as the firstWaiter of the waiting queue
        // If the next node of the current notified node is empty
        if ( (firstWaiter = first.nextWaiter) == null)
          	// The last node of the wait queue (lastWaiter) is set to empty
            lastWaiter = null;
        // The next node of the currently notified node is set to empty, and the old firstWaiter is removed from the wait queue
        first.nextWaiter = null;
    } 
  	// If the notified node is not in the synchronization queue and there are not empty nodes in the waiting queue, the notification loop continues
    while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code

2.3.3 transferForSignal

Add the node in the wait queue to the synchronization queue by changing WaitStatus

final boolean transferForSignal(Node node) {
    // Try to set the Node state to 0. If the setting fails, the Node is no longer in node. CONDITION, and the Node is cancelled without waiting for a notification signal.
    // Simply return false to inform the next wait. Go back to the while loop above
    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
        return false;
    // Put the node into the synchronization queue. Basically, nodes are moved from the wait queue to the synchronous queue
    // p is the last node currently added to the synchronization queue
    Node p = enq(node);
    int ws = p.waitStatus;
  	// The last node was cancelled, and the node that just joined the AQS queue was awakened
    // Failure to set the front Node state to Node.SIGNAL wakes up the thread represented by the notified Node.
    // You can only unLock the lock by using lock.unLock. Wake up the current node
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code

3 summary

The awaitThread first obtains the lock from lock.lock() and then calls condition. Await to enter the wait queue. The other thread, signalThread, calls condition. Signal or signalAll after acquiring the lock successfully, giving the awaitThread the opportunity to move to the synchronization queue. This gives the awaitThread a chance to obtain the lock after another thread releases the lock, so that the awaitThread can exit from the await method to perform further operations. If the awaitThread fails to acquire the lock, it is directly put into the synchronization queue.