From the school to A factory all the way sunshine vicissitudes of life

Please go to www.codercc.com

1. Introduction of Condition

Any Java Object naturally inherits from the Object class, and communication between threads tends to apply several methods to Object, Wait (),wait(long timeout),wait(long timeout, int nanos),notify (), and notifyAll() implement the wait/notification mechanism. The same approach is used to implement wait/notification under Java Lock. On the whole, Object wait and notify/notify cooperate with Object monitor to complete the wait/notification mechanism between threads, while Condition and Lock cooperate to complete the wait notification mechanism. The former is Java low-level level, and the latter is language level, which has higher controllability and expansibility. In addition to the differences in the use of the two, there are many differences in the functions and features:

  1. Condition can not respond to interrupts, but not by using Object.
  2. Condition can support multiple wait queues (new multiple Condition objects), while Object can only support one.
  3. Condition supports timeout while Object does not

Referring to Object’s wait and notify/notifyAll methods, Condition provides the same method:

Wait methods for objects

  1. Void await() throws InterruptedException: The current thread enters the wait state if another thread calls the condition signal or signalAll method and the current thread returns the Lock from the await method. An interrupt exception is thrown if interrupted in the wait state.
  2. Long awaitNanos(Long nanosTimeout) : The current thread enters the wait state until notified, interrupted, or timed out;
  3. Boolean await(long time, TimeUnit Unit)throws InterruptedException: The same as the second, supports custom time units
  4. Boolean awaitUntil(Date deadline) throws InterruptedException: The current thread enters the wait state until it is notified, interrupted, or reaches a certain time

Notify /notifyAll for objects

  1. Void signal() : Wakes up a thread waiting on condition, moves it from the wait queue to the synchronous queue, and returns it from the wait method if it can compete for Lock in the synchronous queue.
  2. Void signalAll() : Differs from 1 in that it wakes up all threads waiting on condition

2.Condition realization principle analysis

2.1 Waiting Queue

To understand condition in depth and how it works, let’s take a look at condiiton’s source code. Create a condition object is through the lock. The newCondition (), and this method is, in fact, the new out a ConditionObject objects, the class is AQS (AQS article) of the implementation of the principle of an inner class, are interested can go and see. Condition and Lock are bound together, and the implementation principle of Lock depends on AQS. Naturally ConditionObject is an internal class of AQS. As we know, AQS maintains a synchronous queue internally. If it is an exclusive lock, the tails of all threads that fail to acquire the lock are inserted into the synchronous queue. Similarly, condition maintains a waiting queue internally in the same way. All threads calling the condition.await method are queued and the thread state transitions to wait state. Note also that ConditionObject has two member variables:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
Copy the code

ConditionObject manages the wait queue by holding the first and last Pointers to it. The main note is that the Node class reuse in AQS Node class, its Node state and related attributes can go to see the implementation principle of AQS article, if you carefully read this article on the understanding of condition is easy, the implementation of lock system will also have a qualitative improvement. The Node class has this property:

// Node nextWaiter;Copy the code

Further, the wait queue is a one-way queue, whereas the synchronous queue was known to be a two-way queue when we said AQS earlier. Next, we will use a demo and debug it to see if it conforms to our conjecture:

public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { lock.lock(); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); }}); thread.start(); }}Copy the code

This code doesn’t make any real sense, even stinks, just to illustrate what we were thinking. 10 new threads are created, no thread acquires the lock first, then calls condition. Await method to release the lock and add the current thread to the wait queue. Debug to check the “firstWaiter”, i.e. the head of the wait queue, when it reaches the 10th thread.

From this figure, we can clearly see the following points: 1. After calling the condition. Await method, threads are successively inserted into the wait queue. Thread references in the queue in the figure are thread0, thread-1, and thread-2…. Thread – 8; 2. The wait queue is a unidirectional queue. Through our conjecture and experimental verification, we can get the schematic diagram of waiting queue as shown in the figure below:

It is also important to note that we can call the lock.newCondition() method multiple times to create multiple condition objects, meaning that a lock can hold multiple wait queues. The previous method of using Object actually means that there can only be one synchronization queue and one wait queue on the Object Object monitor, while the Lock in the simultaneous sending of packets has one synchronization queue and multiple wait queues. The schematic diagram is as follows:

ConditionObject is an internal class of AQS, so each Condition has access to the methods provided by AQS, which means that each Condition has a reference to its synchronizer.

2.2 Implementation principle of await

Calling condition.await() causes the thread currently obtaining the lock to wait. If the thread can return from await(), it must have obtained the lock associated with condition. Next, we still look from the source point of view, only familiar with the source logic of our understanding is the deepest. Await () method

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 1. Wrap the current thread as Node and insert it into the queue. Int savedState = fullyRelease(node); savedState = fullyRelease(node); int interruptMode = 0; while (! IsOnSyncQueue (node)) {// 3. The current thread enters the wait state locksupport.park (this); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; If (acquireQueued(node, savedState) && interruptMode! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter ! = null) // clean up if cancelled unlinkCancelledWaiters(); If (interruptMode! = 0) reportInterruptAfterWait(interruptMode); }Copy the code

As we know, calling condition.await() causes the current thread to release the lock and join the wait queue until it is signaled /signalAll causes the current thread to move from the wait queue to the synchronization queue. It does not return from the await method until the lock is obtained, or it does interrupt processing if it is interrupted while waiting. There are several questions about this implementation: 1. How is the current thread added to the wait queue? 2. Lock release process? 3. How do I exit from the await method? The logic of this code is to tell us the answers to these three questions. Add the current thread to the queue by calling addConditionWaiter in step 1.

private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t ! = null && t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // wrap the currentThread into Node Node Node = new Node(thread.currentthread (), node.condition); if (t == null) firstWaiter = node; Else // Insert t. netwaiter = node; // update lastWaiter = node; return node; }Copy the code

This code is easy to understand, wrapping the current Node as a Node and pointing the firstWaiter to the current Node if the wait queue firstWaiter is null, otherwise updating the lastWaiter(tail Node) is enough. In other words, the Node encapsulated by the current thread can be inserted into the wait queue by tail insertion. Meanwhile, it can be seen that the wait queue is a chain queue without leading nodes. When we learned AQS before, we knew that the synchronous queue is a chain queue with leading nodes, which is a difference between the two. After inserting the current node into the wait column, the current thread releases the lock. FullyRelease is implemented by the fullyRelease method.

final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); If (release(savedState)) {// Release synchronization state failed = false; return savedState; } else {/ / not successful release synchronous state throw new exception IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }}Copy the code

Call AQS template method Release to release the synchronization state of AQS and wake up any thread referenced by its successor in the synchronization queue. If the release succeeds, it returns normally, and if it fails, it throws an exception. So far, these two pieces of code have solved the answers to the first two questions, which leaves the third question, how do I exit from the await method? Now go back and look at the logic of the await method:

while (! IsOnSyncQueue (node)) {// 3. The current thread enters the wait state locksupport.park (this); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; }Copy the code

Obviously, when the thread first calls the condition.await() method, it enters the while() loop and then makes the current thread wait with the locksupport.park (this) method, So the first prerequisite for exiting the await method is naturally to exit the while loop, which leaves only two exits: 1. The logic goes to break and exits the while loop; 2. The logic in the while loop says false. The first condition occurs when the waiting thread is interrupted and the code exits at break. The second condition occurs when the current node is moved to the synchronous queue (i.e. the signal or signalAll method of condition called by another thread). The while loop ends when the logic in the while determines false. Conclude, the current thread is interrupted, or call condition. The signal/condition. The method signalAll moved to the current node synchronization queue, which is a prerequisite for the current thread exit await method. AcquireQueued (Node, savedState) is called when exiting the while loop. This method was introduced in this article about the underlying implementation of AQS. It is used to try to obtain the synchronization state during the spin process. Until it succeeds (the thread gets the lock). This also indicates that the exit await method must be a lock that has obtained a condition reference (association). So far, we have completely found the answers to the first three questions by reading the source code and have a deeper understanding of the await method. The diagram of await method is shown below:

As shown, the thread calling condition. Await must have acquired the LOCK, i.e. the current thread is the head of the synchronization queue. Calling this method causes the Node tail wrapped by the current thread to be inserted into the wait queue.

Timeout mechanism support

Condition also supports a timeout mechanism, and the user can invoke methods awaitNanos,awaitUtil. The implementation principle of these two methods is basically the same as tryAcquire method in AQS. You can read section 3.4 of this article about tryAcquire carefully.

Does not respond to interrupt support

To remember the response can be called condition. AwaitUninterruptibly () method, the method of source for:

public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (! isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }Copy the code

This method is basically the same as the await method above, except that it reduces the handling of interrupts and omits the reportInterruptAfterWait method to throw interrupted exceptions.

2.3 Implementation principles of Signal /signalAll

Calling the signal or signalAll methods of condition moves the longest waiting node to the synchronous queue, giving that node a chance to acquire the lock. The condition signal method is called to move the head node to the synchronous queue. The condition signal method is called to move the head node to the synchronous queue. Signal (); signal ();

public final void signal() { //1. Check if the current thread has acquired lock if (! isHeldExclusively()) throw new IllegalMonitorStateException(); // first = firstWaiter; // first = first waiter; if (first ! = null) doSignal(first); }Copy the code

The signal method first checks whether the current thread has acquired the lock, and then raises an exception if it has, and then gets the node referenced by the head pointer to the waiting queue. The doSignal method for subsequent operations is also based on that node. Let’s look at what the doSignal method does.

private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; First. nextWaiter = null; //2. The transferForSignal method does the real processing on the header} while (! transferForSignal(first) && (first = firstWaiter) ! = null); }Copy the code

TransferForSignal (); transferForSignal ();

final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //1. Update status to 0 if (! compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //2. Node p = enq(Node); int ws = p.waitStatus; if (ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }Copy the code

This code does two things: 1. Change the state of the header to CONDITION; 2. 2. Call the enQ method to insert the tail of the node into the synchronization queue. For enQ method, see the article on the underlying implementation of AQS. Now we can draw a conclusion: The condition signal is called only if the current thread has acquired the lock. This method moves the head node of the waiting queue, that is, the node with the longest waiting time, to the synchronous queue, and then it has a chance to wake up the waiting thread. Return from the locksupport.park (this) method in the await method so that the thread calling the await method has a chance to exit successfully. Signal execution schematic diagram is shown below:

signalAll

The difference between the sigllAll method and the sigal method is reflected in the doSignalAll method. We already know that the doSignalAll method only operates on the head node of the waiting queue.

private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first ! = null); }Copy the code

This method simply waits for each node in the queue to move into the synchronous queue, that is, “notifies” each thread that is currently calling the condition.await() method.

3. Thinking about the combination of await and signal/signalAll

The wait/notification mechanism, which can be implemented by using the await and signal/signalAll methods provided by condition, can solve the classic “producer/consumer problem”. There will be a separate article on “producer-consumer problems”, which is also a frequent interview topic. The await and signal and signalAll methods are like A switch controlling thread A (waiting party) and thread B (notifying party). The relationship between them can be better illustrated by the following diagram:

As shown in the figure, the awaitThread first obtains the lock through lock.lock() and then calls the condition. Await method to enter the wait queue. The other thread, signalThread, calls condition. Signal or signalAll after acquiring the lock successfully, giving the awaitThread the opportunity to move to the synchronization queue. This gives the awaitThread a chance to obtain the lock after another thread releases the lock, so that the awaitThread can exit from the await method to perform further operations. If the awaitThread fails to acquire the lock, it is directly put into the synchronization queue.

3. An example

Let’s use a very simple example of condition:

public class AwaitSignal { private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); private static volatile boolean flag = false; public static void main(String[] args) { Thread waiter = new Thread(new waiter()); waiter.start(); Thread signaler = new Thread(new signaler()); signaler.start(); } static class waiter implements Runnable { @Override public void run() { lock.lock(); try { while (! Flag) {system.out.println (thread.currentThread ().getName() + "current condition not met wait "); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); }} system.out.println (thread.currentThread ().getName()); } finally { lock.unlock(); } } } static class signaler implements Runnable { @Override public void run() { lock.lock(); try { flag = true; condition.signalAll(); } finally { lock.unlock(); }}}}Copy the code

The output is:

Thread0 The current condition does not meet the requirement. The condition is metCopy the code

The “waiter” thread and the “Signaler” thread are enabled. When the “Waiter” thread starts to execute because the condition is not met, we call the condition. Await method to make the thread wait and release the lock, and the “Signaler” thread will change the condition after obtaining the lock and notify all waiting threads to release the lock. At this point, the Waiter thread obtains the lock, and since the Signaler thread has changed the condition, the condition is now satisfied relative to the Waiter, and the execution continues.

reference

The Art of Concurrent Programming in Java