Conditon conditional queues (await method, queue and suspend thread, signal method)

Tip: read the source code as far as possible to open their own source code with blog comments together, otherwise easy to get lost ~

1. Conditional queue flow chart

\

2. Condition interface

public interface Condition {
	// Thread wait, can throw interrupt exception (can respond to interrupt)
	void await(a) throws InterruptedException;
	
	// The thread waits, but cannot respond to interrupts
	void awaitUninterruptibly(a);
	
	// The thread timed out
	boolean await(long time, TimeUnit unit) throws InterruptedException;
	
	The thread waits until it is woken up, receives an interrupt signal, or wakes up automatically at a specified expiration date
	boolean awaitUntil(Date deadline) throws InterruptedException;
	
	// Thread wake up
	void signal(a);
	
	// Wake up all threads
	void signalAll(a);
}
Copy the code

\

ConditionObject (Condition) ConditionObject (Condition

// Inner class in AQS:
public class ConditionObject implements Condition.java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    // The first waiting node of the queue: the head waiting node
    private transient Node firstWaiter;
    // The last waiting node of the queue: the tail waiting node
    private transient Node lastWaiter;
    
    /* newCondition() in ReentrantLock instantiates ConditionObject */ with the static inner class Sync
    public ConditionObject(a) {}... }Copy the code

ConditionObject’s internal method:

3.1 Await thread wait method

// Thread wait, can throw interrupt exception (can respond to interrupt)
public final void await(a) throws InterruptedException {
    // Check whether the current thread is interrupted:
    if (Thread.interrupted())
        // If the status is interrupted, an interrupt exception is thrown directly
        throw new InterruptedException();
    
    // Wrap the current thread as a node, place it in the conditional queue, and return the node that wraps the current thread
    Node node = addConditionWaiter();
    // Release the current thread lock completely (set state to 0) :
    // Why is the lock released? When a thread is suspended, no other thread can wake it up
    int savedState = fullyRelease(node);
    // Condition Specifies the queue interrupt status.
    // 0: No interrupt signal was received during Condition queue suspension ~
    // -1: an interrupt signal ~ is received during Condition queue suspension
    // 1: No interrupt signal was received during Condition suspension, but an interrupt signal was received after migration to the blocking queue ~
    int interruptMode = 0;
    
    // While loop condition: check whether the current node is in the blocking queue:
    IsOnSyncQueue Returns true: the node corresponding to the current thread has been migrated to the blocking queue
    IsOnSyncQueue returns false: the node is still in the conditional queue and needs to park to suspend ~
    while(! isOnSyncQueue(node)) {// Suspend the current thread and put it in wait state ~
        LockSupport.park(this);
        
        // When will you wake up? How many are there?
        // 1. General path: after the external thread acquires the lock, it calls signal() to transfer the head node of the conditional queue to the blocking queue. When the node acquires the lock, it wakes up.
        // 2. After being transferred to the blocking queue, the status of the precursor node in the blocking queue is cancelled, and the current node is awakened
        // 3. While the current node is suspended, it is awakened by an external thread using an interrupt..

        / / checkInterruptWhileWaiting: even in the Condition queue during hanging, thread is interrupted, the corresponding node will be migrated to blocking the queue (lock) to wait to get to
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    
    // The node has been migrated to the blocking queue
    
    AcquireQueued: Contention queue logic, thread nodes contention resources
    // Condition 1: Return true if the blocking queue was woken up by an external thread interrupt..
    // interruptMode! = THROW_IE is valid, indicating that the current node is not interrupted in the conditional queue
    // Set interruptMode = REINTERRUPT
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// Consider Node.nextwaiter! When does the = null condition hold?
    // Node will join the blocking queue if it is awakened by an external thread interrupt, but nextWaiter = null is not set.
    if(node.nextWaiter ! =null) // clean up if cancelled
        // Clear the cancelled node in the condition queue..
        unlinkCancelledWaiters();

    // Condition true: an interruption occurred during the suspension (1. Suspension in conditional queue 2. Pending outside the conditional queue)
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}

// Wrap the current thread as a node, place it in the conditional queue, and return the node that wraps the current thread
// Note that the thread calling the await method is the thread holding the lock state, that is, the addConditionWaiter method called in the await method has no concurrency problem ~
private Node addConditionWaiter(a) {
    // Get a reference to the last node of the current condition queue, save it in the local variable t
    Node t = lastWaiter;
    // condition 1: t! = null: Node elements already exist in the conditional queue
    // CONDITION 2 :(when node is in a queue, its state is CONDITION(-2))
    // t.waitStatus ! CONDITION: the Node is interrupted...
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// Clear the CONDITION queue from all nodes whose CONDITION(-2) is cancelled
        unlinkCancelledWaiters();
        // Update the local variable t as a reference to the latest endpoint ~
        // The unlinkCancelledWaiters method might change the lastWaiter reference
        t = lastWaiter;
    }
    // Create a node for the current thread and set the state to CONDITION(-2).
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    
    // If t == null is true, there are no elements in the conditional queue and the current thread is the first element to enter the queue
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    
    // Update the queue end node reference to node
    lastWaiter = node;
    
    // Returns the node that wraps the current thread
    return node;
}

// Even if the thread is interrupted during the Condition queue suspension, the corresponding node is still migrated to the blocking queue (waiting to acquire the lock) ~
private int checkInterruptWhileWaiting(Node node) {
    // Thread.interrupted() returns the current Thread interrupt bit and resets the current bit to false
    return Thread.interrupted() ?
            / / transferAfterCancelledWait this method only when thread is interrupted by wake up call!
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

final boolean transferAfterCancelledWait(Node node) {
    // Conditional: The node must be in the conditional queue because signal changes the node status to 0 when it migrates the node to the blocking queue
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // Interrupt awakened nodes are also added to the blocking queue!!
        enq(node);
        // true: indicates that the conditional queue is interrupted.
        return true;
    }

    // How many times does this happen?
    // 1. The node has been migrated to a blocking queue by an external thread calling signal.
    // 2. The node is currently being migrated to the blocking queue by an external thread calling signal.

    while(! isOnSyncQueue(node)) Thread.yield();// false: indicates that the current node is not in the conditional queue when interrupted to wake up..
    return false;
}

// Clear the cancelled node in the condition queue.. (Pure linked list operation method)
private void unlinkCancelledWaiters(a) {
    // represents the current node of the loop, iterating backwards from the first node in the list.
    Node t = firstWaiter;
    // A normal node on the current list
    Node trail = null;

    while(t ! =null) {
        // The next node of the current node.
        Node next = t.nextWaiter;
        // If the condition is true, the node is in the cancelled state
        if(t.waitStatus ! = Node.CONDITION) {// Update nextWaiter to null
            t.nextWaiter = null;
            // The condition is true: the node traversed has not encountered a normal node..
            if (trail == null)
                // Update the firstWaiter pointer to the next node
                firstWaiter = next;
            else
                // Make the last normal node point to the next node of the cancelled node.. The problematic node in the middle is skipped..
                trail.nextWaiter = next;

            // The current node is the last node in the queue. Update lastWaiter to the last normal node
            if (next == null)
                lastWaiter = trail;
        }
        else// If the condition is not true, the current node is normaltrail = t; t = next; }}private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    // Condition true: an interrupt has occurred in the conditional queue and the await method throws an interrupt exception
    if (interruptMode == THROW_IE)
        throw new InterruptedException();

    // Conditional true: indicates that the interrupt occurs outside the conditional queue, in which case the current thread interrupt flag bit is set to true
    // Interrupt the processing of the business assigned to you. If you don't deal with it, nothing will ever happen...
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
Copy the code

3.2 fullyRelease method to fullyRelease the current thread lock

// In AQS:
// Release the current thread lock completely (set state to 0) :
final int fullyRelease(Node node) {
    // Complete lock release is successful. If failed, the current thread is not holding the lock and calls the await method.
    // In the finally code block, the state of the node node corresponding to the current thread that has just been added to the conditional queue is changed to cancel if it fails
    // Subsequent threads will clean up the cancelled node
    boolean failed = true;
    try {
        // Get the state value held by the current thread.
        int savedState = getState();
        
        // In most cases: release returns true
        if (release(savedState)) {
            // Set the failure flag to false
            failed = false;
            // Returns the state value released by the current thread
            return savedState;
        } else {
            throw newIllegalMonitorStateException(); }}finally {
        if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code

3.3 isOnSyncQueue Checks whether the current node is in the blocking queue

// In AQS:
// Check whether the current node is in the blocking queue
final boolean isOnSyncQueue(Node node) {
    Node.waitstatus == node. CONDITION: The node must be in the CONDITION queue.
    Signal sets the node state to 0 before migrating the node to the blocking queue
    Node.waitstatus! = node. CONDITION ===>
    // 1.node.waitStatus == 0 (indicates that the current node is singal)
    // 2.node.waitStatus == 1
    // node.waitStatus == 0
    // Because the signal method modifies the state first, then migrates
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    
    // At this point, what happens?
    // node.waitStatus ! = CONDITION and node.prev! = null ===> Can exclude node.waitStatus == 1
    // Why can cancel state be excluded? Signal does not migrate a node in the cancelled state
    // Set the logic referenced by prev to migrate the blocking queue logic set (enq())
    1. Set node.prev = tail; 2. The CAS node is the tail of the blocking queue. 3.pred.next = node;
    // It can be inferred that even if prev is not null, it does not mean that the current node has successfully joined the blocking queue.
    
    // If the condition is true, the current node has been queued successfully, and there are other nodes behind the current node...
    if(node.next ! =null)
        return true;
    
    /** * The current node status is node.prev! FindNodeFromTail = null and Node. waitStatus == 0 * findNodeFromTail Searches for node from the end of the blocking queue, returns true if found, false if not. On the move... Not finished yet... * /
    return findNodeFromTail(node);
}
Copy the code

3.4 Signal thread wake-up method

// in AQS: thread wake up method
public final void signal(a) {
    // Determine if the thread calling signal is an exclusive lock holder. If not, throw an exception.
    if(! isHeldExclusively())throw new IllegalMonitorStateException();

    // Get a node for the conditional queue
    Node first = firstWaiter;
    // If the first node is not null, migrate the first node to the blocking queue logic..
    if(first ! =null)
        doSignal(first);
}

// In AQS: migrate the first node to the blocking queue
private void doSignal(Node first) {
    do {
        // firstWaiter = first.nextwaiter
        // So update firstWaiter for the next node of the current node..
        // If the next node of the current node is null, the conditional queue has only one current node...
        // The queue is empty. So you need to update lastWaiter = null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;

        // The current first node is out of the conditional queue. Disconnect the node from the next node
        first.nextWaiter = null;

        // transferForSignal(first) Returns Boolean type
        // Return true if the current first node is successfully migrated to the blocking queue. Return false if the migration failed...
        // while loop :(first = first waiter)! = null
        // The current first migration failed, then update first to first.next to continue trying to migrate..
        // Until a node is successfully migrated or the conditional queue is null.
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code

3.4 transferForSignal Migrates the current node to a blocking queue

// at AQS: current node migrated to blocking queue:
// Return true if the current first node is successfully migrated to the blocking queue. Return false if the migration failed...
final boolean transferForSignal(Node node) {
    // cas modifies the state of the current node to 0, because the current node will soon be migrated to the blocking queue
    // Success: The status of the current node in the conditional queue is normal.
    // failed: 1. Cancel state (the node corresponding to the thread is set to cancel state when the thread is awaiting the lock)
    // 2. Node was woken up by another thread using interrupt signals during suspension...
    // (the main queue enters the blocking queue, which also changes its status to 0)
    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
        return false;

    // Enq will finally enqueue the current node to the blocking queue, p is the current node in the blocking queue.
    Node p = enq(node);

    // The status of the ws precursor node..
    int ws = p.waitStatus;
    // If ws > 0 is true, the status of the precursor node in the blocking queue is cancelled, and the current node is woken up.
    // if (ws <= 0)
    // compareAndSetWaitStatus(p, ws, Node.SIGNAL) Returns true if the status of the precursor Node is set to SIGNAL successfully
    // compareAndSetWaitStatus(p, ws, node.signal) returns false ===> When will it return false?
    // The precursor node will respond to interrupt when the thread corresponding to the lockInterrupt is enqueued. The precursor node will interrupt when the external thread gives the precursor thread an interrupt signal
    // State is changed to cancel state and queue logic is executed..
    // Wake up the current thread as long as the precursor node state is not 0 or -1.
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL))// Wake up the current node thread... Talk to you later.
        LockSupport.unpark(node.thread);

    return true;
}
Copy the code