introduce

Any Java Object has a set of monitor methods defined on java.lang.Object methods, which include:

Public final void Wait () throws InterruptedException // Timeout limit Waits for public final native void Wait (long timeout) throws InterruptedException InterruptedException public final void Wait (long timeout, Throws InterruptedException // Notifies a thread public final Native void notify() // Notifies all waiting threads public final native void notifyAll()Copy the code

These methods work with the synchronized synchronization keyword to implement the wait/notification mode. Similar to Object’s monitor methods, the Condition interface works with Lock to implement the wait/notification mode. However, the two are different in the way of use and functional characteristics.

Let’s take a look at the Object monitoring method and the Condition interface:

Compare the item Object Monitor Methods Condition
precondition Gets the lock of the object Call lock. Lock () to get the Lock

Call lock. newCondition to get the Condition object
Call way Call directly, such as object.wait() Call directly, as in condition.await()
Number of waiting queues a multiple
The current thread releases the lock and enters the wait queue support support
The current thread releases the lock and enters the wait queue, and does not respond to interrupts in the wait state Does not support support
The current thread releases the lock and enters the timeout wait state support support
The current thread releases the lock and enters the wait state to some future time Does not support support
Wakes up a thread in the wait queue support support
Wake up all threads in the wait queue support support

How does the Condition interface work with Lock reentrant? The Condition newCondition() method of the Lock interface, which the reentrant lock implements, generates an instance of Condition bound to the current reentrant lock. The Condition object allows the thread to wait or be notified at the appropriate time to continue execution. Condition is dependent on the Lock object, and the Lock associated with Condition must be obtained in advance.

The basic method of Condition definition

// Make the current thread wait while releasing the current lock // jump out of the waiting situation: //1. When another thread calls the Condition's sign() or signAll() method, the thread regains the lock and continues executing //2. Void await() throws InterruptedException // similar to await(), But it does not respond to interrupts while waiting void awaitUninterruptibly() // The current thread enters the wait state until notified, interrupted, or timed out. The return value (nanosTimeout- actual time) represents the remaining time. Long awaitNanos(Long nanosTimeout) throws InterruptedException // The current thread enters the wait state until notified, interrupted, or timed out. False Boolean await(long time, TimeUnit Unit) throws InterruptedException // The thread enters the waiting state. Until notified, interrupted, or a certain time. Returns true if the specified time is not reached, otherwise, the specified time is reached, Method returns false Boolean awaitUntil(Date deadline) throws InterruptedException // Wakes up a thread waiting in the Condition void Signal () // Wake up all threads waiting on Condition void signalAll()Copy the code

Within the JDK, reentrant locks and Condition objects are widely used, such as the PUT and take methods of ArrayBlockQueue, if you are interested.

The realization analysis of Condition

Let’s take a look at the implementation of newCondition:

final ConditionObject newCondition() {
    return new ConditionObject();
}
Copy the code

ConditionObject. ConditionObject is the inner class of the synchronizer AQS. Because the Condition operation needs to acquire the associated lock, an inner class as a synchronizer also makes sense. Each Condition contains a wait queue, which is the key to the Condition’s wait/notification function.

1. Wait in a queue

The wait queue is a FIFO queue whose nodes are the waiting threads on the Condition object. Reentrant lock maintenance synchronization queue and waiting queue on the Condition Node types are AbstractQueuedSynchronizer synchronizer of a static inner class. The Node.

ConditionObject has the following two properties:

/** First node of condition queue. */ / private TRANSIENT node firstWaiter; /** Last node of condition queue. */ / private transient node lastWaiter;Copy the code

As you can see, the Condition has a first node (firstWaiter) and a last node (lastWaiter) to manage the wait queue. The basic structure of the wait queue is as follows:

As shown in the figure above, the Condition has a one-way queue of references to the beginning and end nodes, and the new node simply points the original end node, nextWaiter, to it and updates the end node.

On the monitor model of Object, an Object has a synchronization queue and a wait queue, while a Lock (more specifically, a synchronizer) in a concurrent packet has a synchronization queue and multiple wait queues (a Lock creates multiple Condiction objects), as shown below:

As shown in the figure, the synchronization queue maintained by the synchronizer is a bidirectional queue. The implementation of Condition is an internal class of the synchronizer, so each Condition instance has access to the methods provided by the synchronizer, which means that each Condition has a reference to its own synchronizer.

2. Waiting for

We await() the Condition method source code:

/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, Throw InterruptedException. * </ OL > */ public final void await() throws InterruptedException {// Throws an interrupt exception if if the thread is interrupted (Thread.interrupted()) throw new InterruptedException(); // add a new Node to the queue. Node Node = addConditionWaiter(); Int savedState = fullyRelease(node); int interruptMode = 0; // Check whether the node is in the synchronization queue (it has released the lock to join the wait queue). IsOnSyncQueue (node)) {// The current thread enters the waiting state locksupport. park(this); / / determine whether interruption in the process of waiting for the if ((interruptMode = checkInterruptWhileWaiting (node))! = 0) break; If (acquireQueued(node, savedState) && interruptMode! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter ! = null) // clean up if cancelled unlinkCancelledWaiters(); If (interruptMode! = 0) reportInterruptAfterWait(interruptMode); }Copy the code

Call the await() method if the thread has acquired the lock. To add to the wait queue, we call the addConditionWaiter() method, which adds a new node to the end of the wait queue constructed by the current thread.

** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t ! = null && t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // Wrap the current Node as Node, CONDITION = new Node(thread.currentThread (), node.condition); If (t == null) // If (t == null) // If (t == null) // If (t == null) // If (t == null) Else // If there are waiting nodes in the queue, point the last node of the queue to node. // Update the end node to point to node (insert the new node to the end) lastWaiter = node; return node; }Copy the code

The process of the current thread joining the Condition wait queue is as follows:

After joining the wait queue, fullyRelease(node) is called to release the synchronization state (release the lock) and wake up the subsequent nodes in the synchronization queue.

/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final int fullyRelease(Node node) { boolean failed = true; Int savedState = getState(); If (release(savedState)) {release(savedState) failed = false; return savedState; } else {/ / release the synchronization state failure is throw new exception IllegalMonitorStateException (); }} finally {if (failed) // Set Node waitStatus to Node.cancelled (indicating that the thread has been CANCELLED) node.waitStatus = Node.cancelled; }}Copy the code

After the above operation, the current thread should enter the wait state. It should enter the wait state.

while (! IsOnSyncQueue (node)) {// The current thread enters the waiting state locksupport. park(this); / / determine whether interruption in the process of waiting for the if ((interruptMode = checkInterruptWhileWaiting (node))! = 0) break; }Copy the code

IsOnSyncQueue (node) will determine whether the current node is in the synchronization queue. If not, locksupport. park(this) will block and wait. Locksupport. park(this) does not return directly in response to the interrupt, and the following condition is executed to determine whether the thread is interrupted. If it is interrupted, the loop will be broken.

This tells us that either another thread, signal() or signbalAll(), joins the queue and unpark the waiting thread. Or respond to an interrupt.

After breaking out of the body of the loop, the next action is to respond to an interrupt if it is interrupted, and to call AQS ‘acquireQueued method if it is woken up without an interrupt to try to spin the lock.

3. Inform

Calling Condition’s sign method moves the first node to the synchronization queue and wakes up the thread in the node using LockSupport.

The signal method looks like this:

/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { // Check whether the current thread is the thread that acquired the lock. If (! isHeldExclusively()) throw new IllegalMonitorStateException(); // first Node reference Node first = firstWaiter; if (first ! = null) doSignal(first); }Copy the code

The doSignal method does this:

/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private Void doSignal(Node first) {do {// If the first Node pointing to a Node is empty, no Node is waiting for a queue. If (firstWaiter = first.nextwaiter) == null) lastWaiter = null; First. nextWaiter = null; } while (! transferForSignal(first) && (first = firstWaiter) ! = null); }Copy the code

The transferForSignal(first) method is used to transfer signal (first) through the first node.

/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, */ / the node has been cancelled. */ / Update waitStatus from wait to 0, if the update fails, the node has been cancelled. 'Node waitStatus' is CANCELLED' Node. 'if' await 'is interrupted' node waitStatus' is CANCELLED 'Node.' If (! compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, Wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ / add node to a synchronous queue enq(node); int ws = p.waitStatus; if (ws > 0 || ! CompareAndSetWaitStatus (p, ws, Node.signal)) Make the thread return locksupport.unpark (node.thread) from locksupport.park (this); return true; }Copy the code

We can see that this method first updates the thread’s node with a waitStatus of 0. Then through ENQ, the node is added to the synchronization queue at the tail and wakes up the wait queue.

The process of ENQ moving the node to the synchronization queue is shown in the figure below:

There is another singalAll method for notifications:

/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { if (! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first ! = null) doSignalAll(first); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first ! = null); }Copy the code

The singal() method operates on the node with the longest waiting time in the wait queue (the first node). The singalAll() method executes signal once for each node in the queue. Move all the nodes in the wait queue to the synchronization queue and wake up each node’s thread (lock contention).

Simple Usage Examples

public class ReentrantLockCondition implements Runnable{ public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println("Thread is going on"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { ReentrantLockCondition tl = new ReentrantLockCondition(); Thread t1 = new Thread(tl); t1.start(); Thread.sleep(2000); // tell thread T1 to continue executing lock.lock(); condition.signal(); lock.unlock(); }}Copy the code

The thread condition.await() enters the wait state and releases the lock. The main thread calls condition.signal (after the call, the associated lock needs to be released) to inform the waiting thread to continue.

Reference books: Java High Concurrency Programming (2nd Edition), Practical Java Concurrent Programming, The Art of Java Concurrent Programming