preface


ConditionObject is one of the internal classes in AQS, but ConditionObject was not introduced at that time. When you read the source code later, you will find that Condition is used in many places. Read the Condition source code today to find out what Condition does. Of course, by the time you read this article, you should have read AQS, ReentrantLock and LockSupport.


[liuzhirichard] Record technology, development and source code notes in work and study. From time to time, share what you’ve seen and heard in your life. Welcome to guide!

introduce

Object monitor methods: Wait, notify, and notifyAll are familiar. In multi-threaded scenarios, synchronized must be used to obtain the lock before Object’s wait and notify can be called.

The use of Condition is equivalent to replacing synchronized with Lock, and then replacing Object’s monitor methods with Condition.

Conditions (also known as condition queues or condition variables) provide a way for one thread to pause execution (wait) until another thread informs the blocked thread that certain state Conditions may now be true.

Because access to this shared state information occurs in different threads, it must be protected, so some form of lock is used. The key attribute provided by the wait condition is that it releases the associated lock atomically and suspends the current thread, just like object.wait.

Condition instances are essentially bound to locks. To obtain a Condition instance, the newCondition() method of a Lock instance is typically used.

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();
Copy the code

The basic use

class BoundedBuffer {

    final Lock lock = new ReentrantLock();
    Condition instance depends on lock instance
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];

    int putPtr, takePtr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            // put to check if it is full
            // the thread queue blocks on the notFull condition
            while (count == items.length) {
                notFull.await();
            }
            items[putPtr] = x;
            if (++putPtr == items.length) {
                putPtr = 0;
            }
            ++count;
            // After the put succeeds, there are elements in the queue
            // Wake up the queue blocking thread on the notEmpty condition
            notEmpty.signal();
        } finally{ lock.unlock(); }}public Object take(a) throws InterruptedException {
        lock.lock();
        try {
            // take is found empty
            // the thread queue blocks on the condition notEmpty
            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[takePtr];
            if (++takePtr == items.length) {
                takePtr = 0;
            }
            --count;
            // the queue cannot be full
            // Wake up the thread queued to block on the notFull condition
            notFull.signal();
            return x;
        } finally{ lock.unlock(); }}}Copy the code

Here’s an example of the official documentation that implements a simple BlockingQueue, and if you look at it, you’ll see that this logic is used in many places in synchronized queues. The necessary code instructions are commented out in the code.

Question question

  1. What does Condition have to do with AQS?
  2. What is the implementation principle of Condition?
  3. What are the differences and connections between the waiting queue of Condition and the synchronous queue of AQS?

Source code analysis

The basic structure

It can be seen from UML that Condition is only an abstract class, and its main implementation logic is realized in ConditionObject, an internal class of AQS. ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject

Create a Condition

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();
Copy the code

Condition variables are typically created using lock.newcondition ().

public class ReentrantLock implements Lock.java.io.Serializable {

    private final Sync sync;

    public Condition newCondition(a) {
        return sync.newCondition();
    }
    // Sync integrates AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        final ConditionObject newCondition(a) {
            return newConditionObject(); }}}Copy the code

Used here is already source, call the sync. NewCondition (), sync inheritance AQS, is actually created a ConditionObject AQS inner class instance.

Here it is important to note the lock every call lock, newCondition () there will be a new ConditionObject instance, is a lock can create multiple instances Condition.

Condition parameters

/** The first node of the conditional queue */
private transient Node firstWaiter;
/** The last node in the conditional queue */
private transient Node lastWaiter;
Copy the code

Await method

Await method, which causes the current thread to wait until it receives a signal or is interrupted.

The lock associated with this Condition is released atomically, and for thread scheduling purposes, the current thread is disabled and dormant until one of the following four conditions occurs:

  1. Some other thread calls the Condition’s signal method, and the current thread is chosen as the one to wake up;
  2. Several other threads call the Condition’s signalAll method;
  3. Other threads interrupt the current thread and support interrupt thread suspension;
  4. False wake up occurred.

In all cases, the current thread must reacquire the lock associated with this condition before this method can return. This lock is guaranteed to be held when the thread returns.

Now look at the implementation logic within AQS:

public final void await(a) throws InterruptedException {
    // The response is interrupted
    if (Thread.interrupted())
        throw new InterruptedException();
    // Add to end of conditional queue (wait queue)
    // A Node of type Node.CONDITION is created internally
    Node node = addConditionWaiter();
    // Release the lock acquired by the current thread (by manipulating the value of state)
    // The lock will be blocked and suspended
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // If the node is no longer in the synchronization queue, park is called to make it hang in the wait queue
    while(! isOnSyncQueue(node)) {// Call park to block and suspend the current thread
        LockSupport.park(this);
        Signal was called or the thread was interrupted
        // If the terminal is woken up, the loop is broken
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    // When the while loop ends, the thread grabs the lock
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
        unlinkCancelledWaiters();
    // Handle interrupts uniformly
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}
Copy the code

The await method steps are as follows:

  1. CONDITION Node and add it to ConditionQueue. ConditionQueue
  2. Release the lock acquired by the current thread (by manipulating the value of state)
  3. Check whether the current thread is in the SyncQueue, otherwise park will be used to suspend.
  4. At the end of the loop, it indicates that it is already in the SyncQueue, and then waits for the lock to be obtained.

Make a clear distinction between conditional queues and synchronous queues.

Conditional queue/wait queue: queue of Condition synchronous queue: queue of AQS.

Read the important methods in await below:

  • AddConditionWaiter () method
private Node addConditionWaiter(a) {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // Determine the state of the tail node, if cancelled, then clear all cancelled nodes
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }// Create a new Node of type node.condition
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // Put the new node at the end of the wait queue
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
Copy the code

The addConditionWaiter method simply creates a Node of type Node.CONDITION and places it at the end of the CONDITION queue. Other conclusions can also be drawn from this code:

  1. The nodes inside the conditional queue use only the Thread, waitStatus, and nextWaiter attributes.
  2. Conditional queues are one-way queues.

For comparison, conditional queues are compared to synchronous queues:

The AQS synchronization queue is as follows:

Let’s look at the conditional queue of conditions

WaitStatus is introduced in AQS:

  1. The default value is 0.
  2. WaitStatus > 0 (CANCELLED 1) Indicates that the node times out or is interrupted and needs to be removed from the queue.
  3. WaitStatus = -1 SIGNAL If the status of the previous node of the current thread is SIGNAL, the current thread should block (unpark).
  4. WaitStatus = -2 condition-2: the node is currently in the conditional queue.
  5. WaitStatus = -3 PROPAGATE -3: releaseShared should be propagated to other nodes for use in shared lock mode.
  • FullyRelease method (AQS)
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // Get the state of the current node
        int savedState = getState();
        / / releases the lock
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw newIllegalMonitorStateException(); }}finally {
        if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code

The fullyRelease method, provided by AQS, first gets the current state and then calls the Release method to release the lock.

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

The release method is described in detail in AQS. Its main function is to release the lock, and note that:

  1. FullyRelease will release all the locks at once, so no matter how many times you reenter it, it will all be released here.
  2. Exceptions are thrown, mainly when the lock fails to be released, and the Node status is set to node.cancelled in finally.
  • isOnSyncQueue(node)

Through the above flow, the node has been placed on the conditional queue and released the held lock, which will then hang and block until signal wakes up. However, only when the node is no longer in the SyncQueue can it be suspended.

final boolean isOnSyncQueue(Node node) {
    // The current node is a conditional queue node, or the previous node is empty
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if(node.next ! =null) // If has successor, it must be on queue
        return true;

    return findNodeFromTail(node);
}
// Start traversal from the tail
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false; t = t.prev; }}Copy the code

Returns true if a node (always a node originally placed on the conditional queue) is now waiting to be reacquired on the synchronous queue.

The main purpose of this code is to determine whether the node is in the synchronization queue. If not, park will be called later to block the current thread. There is a question: the AQS synchronization queue and the Condition queue should be independent, why do we need to ensure that the node is not behind the synchronization queue before blocking? After a node is woken up by signal or signalAll, it is placed in a synchronization queue.

The thread is blocked at this point and wakes up when another thread calls Signal or signalAll.

The current thread is then verified to wake up due to an interrupt, assuming that no interrupt has occurred. The isOnSyncQueue(Node Node) of the while loop must return true, indicating that the current Node is already in the synchronization queue.

AcquireQueued (Node, savedState) is then called to obtain the lock.

final boolean acquireQueued(final Node node, int arg) {
    // Whether to get resources
    boolean failed = true;
    try {
        // Interrupt status
        boolean interrupted = false;
        // Infinite loop
        for (;;) {
            // The node before the current node
            final Node p = node.predecessor();
            // Next is the first real data node (head is a virtual node)
            // Then try to get the resource
            if (p == head && tryAcquire(arg)) {
                // After success, the head pointer points to the current node
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // p is not the head node, or the head node does not obtain the resource (in unfair cases, other nodes preempted)
            // Determine whether the node is to be blocked. It will remain blocked until the lock is acquired
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Here is the logic of AQS, also can read the relevant introduction of AQS.

  1. Continuously get whether the last node of this node is head, because head is a virtual node. If the last node of the current node is head, then the current node isFirst data node >;
  2. The first data node continuously obtains resources. If the resource is successfully obtained, head points to the current node.
  3. The current node is not a head node, ortryAcquire(arg)Failure (failure may be an unfair lock). In this case, the state of the previous node needs to be determinedWhether the current node is to be blocked(Whether the state of the previous node is SIGNAL).

It is worth noting that when a node is placed in the synchronization queue of AQS, it also fights for resources and sets the value of savedState, which represents the number of reentrant times released when the lock is released.

The overall process is drawn as follows:

signal

public final void signal(a) {
    // Whether the thread is currently held
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if(first ! =null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // The firstWaiter header node points to the next node in the conditional queue header
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // Disconnect the original head node from the synchronization queue
        first.nextWaiter = null;
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}

final boolean transferForSignal(Node node) {
 
    // Determine whether the node has been cancelled previously
    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
        return false;

    // Call enq to the end of the synchronization queue
    Node p = enq(node);
    int ws = p.waitStatus;
    // The last node of the node is changed to SIGNAL so that it can wake up later
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code

Enq can also read AQS codes

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // If the tail node is empty, initialize the head node
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // Do not assign to an empty loop
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

After a node is placed on the AQS synchronization queue using enQ, set the waitStatus of the preceding node to Node.signal. SignalAll’s code is similar.

conclusion

Q&A

Q: How does Condition relate to AQS?

ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition)

Q: What is the implementation principle of Condition?

A: Condition maintains A conditional queue internally. In case of obtaining A lock, the thread calls await and is placed in the conditional queue and blocked. Until the thread is woken up by calling signal or signalAll, it will be put into the synchronization queue of AQS to compete for lock resources.

Q: What are the differences and connections between the waiting queue of Condition and the synchronous queue of AQS? A: Condition’s waiting queue is A one-way linked list, while AQS ‘is A bidirectional linked list. There is no clear link between the two. The node is moved from the wait queue to the synchronization queue only after it is woken up from the blocked state.

conclusion

This article focuses on reading the code for Condition, but omits logic such as thread interrupts. Interested partners. Can be more in-depth study of the relevant source code.