preface

Condition is a companion to Lock, and we have written several articles on how to use it, such as implementing a blocking queue using ReentrantLock and Condition, and concurrent programming of Java triple locks. In both articles, We have described their use in detail. Today we’ll take a closer look at the source code implementation.

A constructor

Condition interfaces have 2 implementation class, one is AbstractQueuedSynchronizer ConditionObject, Is another AbstractQueuedLongSynchronizer ConditionObject, are AQS inner classes, the class structure is as follows:

Several public methods:

  1. await()
  2. await(long time, TimeUnit unit)
  3. awaitNanos(long nanosTimeout)
  4. awaitUninterruptibly()
  5. awaitUntil(Date deadline)
  6. signal()
  7. signalAll()

Today we will focus on two methods, the two most commonly used: await and signal.

Await method

Post a bunch of code comments first:

public final void await(a) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // Create a new node and append it to the last node in the Condition queue.
    Node node = addConditionWaiter();
    // Release the lock and wake up a thread in the AQS queue.
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // Check whether this object is on the AQS queue. The first check always returns false
    while(! isOnSyncQueue(node)) {// The first time is always park himself, starting to block waiting
        LockSupport.park(this);
        // The thread determines whether it is interrupted while waiting. If it is not interrupted, it loops again to determine whether it is on the queue in isOnSyncQueue.
        IsOnSyncQueue determines the current node state and continues to block if it is in CONDITION or not on the queue (the JDK notes that the CAS operation may fail on the node on the queue).
        // isOnSyncQueue terminates the loop and blocks if the node is still on the queue and not in CONDITION.
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            // If it is interrupted, the loop is broken
            break;
    }
    When the thread wakes up, it will attempt to acquire the lock, and acquireQueued returns false to acquire the lock.
    // interruptMode ! = THROW_IE >>> indicates that the thread failed to enqueue node, but signal executed enq to enqueue node.
    // Set this variable to REINTERRUPT.
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// If the next waits for node are not null, the Condition queue is cleaned up.
    // If null, there is nothing left to clean up.
    if(node.nextWaiter ! =null) // clean up if cancelled
        unlinkCancelledWaiters();
    If the thread is interrupted, an exception needs to be thrown. Or do nothing
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}
Copy the code

To summarize the logic of this approach:

  • In Condition, a queue is maintained, and each time the await method is executed, a node is created from the current thread and added to the tail.
  • It then releases the lock and wakes up one of the threads blocking the LOCK’s AQS queue.
  • Then, it blocks itself.
  • After being woken up by another thread, the node is placed on the AQS queue. Then it’s all about that node, like grabbing the lock.
  • Next thing you know, you try to grab the lock. The logic is the same as normal locks: block if you can’t get it, and continue executing if you can get it.

Looking at the detailed source code implementation, how does Condition add nodes? AddConditionWaiter:

// Create a node of the current thread and append it to the last node.
private Node addConditionWaiter(a) {
    // Find the last node and place it in a local variable, which is faster
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // If the last node fails, all failed nodes in the list are cleared and t is reassigned
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }// Create a node for the current thread
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // If the last node is null
    if (t == null)
        // Set the current node to the first node
        firstWaiter = node;
    else
        // If not null, append the current node to the last node
        t.nextWaiter = node;
    // Set the current node to the last node
    lastWaiter = node;
    / / return
    return node;
}
Copy the code

Create a node for the current thread and append it to the last node. There is also a great waiters call to the unlinkCancelledWaiters method, which, when the last node fails, needs to clean the invalid node in the Condition queue as follows:

// Clear the list of all failed nodes.
private void unlinkCancelledWaiters(a) {
    Node t = firstWaiter;
    // When next is normal, you need to save next so that the next loop is linked to the next node.
    Node trail = null;
    while(t ! =null) {
        Node next = t.nextWaiter;
        // If this node is cancelled
        if(t.waitStatus ! = Node.CONDITION) {// Set his next node to null
            t.nextWaiter = null;
            // If this is the first time to judge the trail variable
            if (trail == null)
                // Set the next variable to first, which removes the previous first.
                firstWaiter = next;
            else
                // If not null, the last node is normal. Set next of the last node to next of the invalid node to invalidate t
                trail.nextWaiter = next;
            // If next is null, there are no nodes, then trail can be set as the last node
            if (next == null)
                lastWaiter = trail;
        }
        // If the node is ok, save the node for the next link
        else
            trail = t;
        // Switch to the next node to continue the loopt = next; }}Copy the code

So how do you release the lock and wake up a thread on a node in the AQS queue? FullyRelease:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // Get the state variable
        int savedState = getState();
        // If the release succeeds, the size of state is returned, which is the number of threads that previously held the lock
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // If the release fails, throw an exception
            throw newIllegalMonitorStateException(); }}finally {
        // Release failed
        if (failed)
            // Set this node to the cancelled state. It is then removed from the queue.node.waitStatus = Node.CANCELLED; }}Copy the code

Now in release, how does that work? The code is as follows:

// The main function is to release the lock and wake up the thread blocking on the lock.
public final boolean release(int arg) {
	// If the lock is released successfully and returns true, a monitor exception may be thrown that the current thread is not the thread holding the lock.
	// It can also fail to release, but fullyRelease is basically successful.
    if (tryRelease(arg)) {
    	// Wake up the thread on the next node of the head.
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    // Release failed
    return false;
}
Copy the code

The release method primarily calls the tryRelease method, which releases the lock. The tryRelease code looks like this:

// The main function is to subtract the state variable. If state becomes 0, the thread holding the lock is set to NULL.
protected final boolean tryRelease(int releases) {
	/ / state
    int c = getState() - releases;
    // If the current thread is not the thread holding the lock, an exception is thrown
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    // If the result is 0, the lock was successfully released.
    if (c == 0) {
        free = true;
        // Set the thread holding the current lock to NULL.
        setExclusiveOwnerThread(null);
    }
    // Set variables
    setState(c);
    return free;
}
Copy the code

Ok, so we have a sense of how Condition releases locks, but how does it block itself? Before blocking itself, we need to call isOnSyncQueue, which looks like this:

final boolean isOnSyncQueue(Node node) {
    // If his state is not uniform and his last node was null, he is not in the queue
    CONDITION == CONDITION; CONDITION == CONDITION;
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // If his next is not null, he is still on the queue.
    if(node.next ! =null) // If has successor, it must be on queue
        return true;
    // If we find the given node from tail, it is also on the queue. Returns true.
    return findNodeFromTail(node);
}
Copy the code

In fact, the fasle is always returned the first time, and the Condition blocks itself by calling the park method in the while block. At this point, the Condition successfully releases its Lock and blocks itself.

Blocked, but always someone call signal method wake him, wake up after go below the if logic, namely checkInterruptWhileWaiting method, the name is when waiting for check the interrupted status, the code is as follows:

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        / / transferAfterCancelledWait > > > > if the node into the AQS queue fails, it returns REINTERRUPT, THROW_IE success
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
Copy the code

When a thread is interrupted, you need to invoke transferAfterCancelledWait method return values according to return different constants, the method of internal logic is how?

final boolean transferAfterCancelledWait(Node node) {
    // After successfully setting the node state to 0, place the node in the AQS queue.
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    // If CAS fails, return false,
    // Spin when node is not on AQS. Until the ENQ method completes.
    // The JDK considers it rare and temporary that a node is not on the AQS queue during this process. So the spin.
    // If you do not spin, the logic will not continue. In effect, the spin is waiting for the enQ method to execute in signal to enqueue node.
    while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code

You’re actually trying to put yourself in a queue. If not, spin to wait for the signal method to put in.

Go back to await method, continue down, execute 3 if blocks, first if block, try to get lock, why? Since the thread has been woken up and is in the queue of AQS, it needs to fetch the lock when it wakes up. The acquireQueued method is the lock holding logic as follows:


// Return the result: whether the lock is broken or not.
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // returns his last node
            final Node p = node.predecessor();
            // If the last node of this node was head and the lock was successfully acquired.
            if (p == head && tryAcquire(arg)) {
                // Set the current node to head
                setHead(node);
                // His previous node (head) is set to null.
                p.next = null; // help GC
                failed = false;
                // Return false without interruption
                return interrupted;
            }
            / / shouldParkAfterFailedAcquire > > > if you do not have access to the lock, is trying to block his SIGNAL (on a node status is - 1).
            // parkAndCheckInterrupt >>>> returns whether you are interrupted.
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Notice that if you don’t get the lock here, it blocks in the parkAndCheckInterrupt method. This is exactly the same as the queue nodes in normal AQS, nothing special.

There is one tryAcquire method to note: this is where the logic of trying to grab the lock comes in. The code is as follows:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // If the lock state is idle.
    if (c == 0) {
        / /! Hasqueuedtoraise () >>> Whether to contain a thread that waits longer than its own, false >> no
        // compareAndSetState >>> CAS succeeded in setting the state variable
        // Succeeded in setting the current thread as the thread holding the lock
        if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            // If the above three conditions are met, the lock is snatched successfully.
            return true; }}// If the state is not 0 and the current thread is the same as the thread holding the lock, it is considered reentrant.
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code

The main logic is to set the state variable to make the thread that holds the lock its own. This happens when there is no thread waiting longer than itself. This means that the thread that has been waiting for a long time gets the lock first. Of course, there’s also some reentrant logic.

The following two if blocks are simple, as long as there are nodes in the Condition, we try to clean the bad ones, using the unlinkCancelledWaiters method, which we have already analyzed above and don’t want to repeat our work.

Finally, to determine whether it is interrupted, execute the reportInterruptAfterWait method, which may throw an exception or mark the current thread with an interrupt.

The code is as follows:

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
static void selfInterrupt(a) {
    Thread.currentThread().interrupt();
}

Copy the code

Ok, we are done with the analysis of await method. As you can see, the host has posted a lot of comments. In fact, they are all for better review in the future, and it is convenient for interested students to analyze the source code with my comments.

The summary of this method is said at the beginning, I will not repeat the summary. We will summarize this later in conjunction with the signal method.

Now look at the signal method implementation.

Signal method

The code is as follows:

public final void signal(a) {
	// If the current thread is not the thread holding the lock. An exception is thrown
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    // Get the first node on the Condition queue
    Node first = firstWaiter;
    if(first ! =null)
        doSignal(first);
}
Copy the code

Obviously, the wake-up strategy starts in the head.

Look at the doSignal(first) method implementation:

private void doSignal(Node first) {
    do {
    	// If the next node of the first node is null, then the last node is null.
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // Set the next node to null.
        first.nextWaiter = null;
        // If you fail to change the node state to 0 and firstWaiter is not null, loop again.
        // Find nodes backwards from First until they wake up or there are no nodes.
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code

The key is the transferForSignal method, which must do the wake up operation.

final boolean transferForSignal(Node node) {
    // If the state cannot be changed, cancel the node.
    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
        return false;

    // Put this node into the AQS queue and return its last node.
    Node p = enq(node);
    int ws = p.waitStatus;
    // If the state of the previous node is cancelled, or attempts to set the state of the previous node to SIGNAL fail (SIGNAL means that its next node needs to stop blocking),
    // Wake up the thread on the input node.
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);// If the node state is successfully changed to 0, true is returned.
    return true;
}
Copy the code

Sure enough, you see the unpark operation. The CAS first modifies the node state and, if successful, puts the node on the AQS queue and wakes up the thread on the node. At this point, the node will await method is waking up, and began to try to get locked after execution checkInterruptWhileWaiting method.

conclusion

Summarize how Condition executes await and signal.

  1. First, if a thread wants to execute an await method, it must get the lock. In AQS, the lock is usually grabbed by the head, and the head is invalidated and removed from the queue.

  2. After the current thread (head of AQS) has acquired the lock, we call await method. The first step is to create a Node, place it at the end of the Condition’s queue, wake up one of the AQS (head) nodes, and then block itself waiting to be woken up by signal.

  3. When another thread calls signal, the first node in the Condition queue is woken up and placed at the end of the AQS queue.

  4. After the thread blocking the await method wakes up, it has been transferred from the Condition queue to the AQS queue. At this point, it is a normal AQS node and will attempt to grab the lock. And clears invalid nodes in the Condition queue.

The diagram below illustrates these steps.

Well, that’s enough about Condition. Basically, this is done by adding a new sleep queue to the Condition. Call the await method and it will sleep, enter the Condition queue, call the signal method and it will fetch a thread from the Condition queue and insert it into the AQS queue and wake up, leaving the thread to grab the lock itself.