Little surprise for readers: Poke me

preface

Each Java Object has a set of monitor methods (defined on java.lang.object), which include wait(), wait(long Timeout), notify(), and notifyAll(). These methods, in conjunction with the synchronized synchronization keyword, enable a communication (wait/notification) mechanism between threads.

In the previous article, we introduced the implementation of ReentrantLock and AQS queue, and Lock also has its own corresponding wait/notification mechanism Condition queue, Condition interface also provides object-like monitor methods. The wait/notification pattern can be implemented in conjunction with Lock, mainly through the methods await() and singal().

Before this article, it is recommended that you learn about the ReentrantLock and AQS queue implementation principles introduced in the previous article. This article is also about AQS and Node objects.

First Condition

Condition, like Lock, is an interface within JUC. The Condition interface defines two types of wait/notification methods that require the lock associated with the Condition object to be acquired before the current thread calls these methods. The Condition object is created by the Lock object (which calls the Lock object’s newCondition() method); in other words, the Condition depends on the Lock object.

ConditionObject, the implementation of Condition, is an inner class of the AQS class and also depends on the Node object.

Condition Usage Example

Condition is also very simple to use. Here is a simple example:

package com.zwx.concurrent.lock; import java.util.Locale; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockConditionDemo { public static void main(String[] args) throws InterruptedException { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(new ConditionAwait(lock,condition)).start(); Thread.sleep(1000); new Thread(new ConditionSingal(lock,condition)).start(); } } class ConditionAwait implements Runnable{ private Lock lock; private Condition condition; public ConditionAwait(Lock lock, Condition condition) { this.lock = lock; this.condition = condition; } @Override public void run() { System.out.println("await begin"); try { lock.lock(); condition.await(); }catch (InterruptedException e){ e.printStackTrace(); }finally { lock.unlock(); } System.out.println("await end"); } } class ConditionSingal implements Runnable{ private Lock lock; private Condition condition; public ConditionSingal(Lock lock, Condition condition) { this.lock = lock; this.condition = condition; } @Override public void run() { System.out.println("signal begin"); try { lock.lock(); condition.signal(); }finally { lock.unlock(); } System.out.println("signal end"); }}Copy the code

After running, the output is:



This effect is the same as that of wait() and nodity(). How does Condition implement the wait notification mechanism?

Condition principle analysis

ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject

Like the AQS synchronization queue introduced in the previous article, the Condition is a FIFO queue that relies on Node objects.

The Condition queue, called the wait queue, does not maintain prev and NEXT, but a single item list, implements the first and last nodes through firstWaiter and lastWaiter, and then, in addition to lastWaiter, Each other node will have a nextWaiter pointing to the next node. The Condition queue is roughly as follows:

Condition. Wait () source code interpretation

Let’s dive into the source code and examine the implementation of Condition. The above example, when we call condition. The wait (), we enter the AbstractQueuedSynchronizer await in class () method.

AQS#await()



The first step is to check whether or not this is interrupted. Without further ado, let’s look at the following addConditionWaiter() method:

AQS#addConditionWaiter()



To make it easier to understand, let’s post the Node object:

static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; Static final int SIGNAL = -1; Static final int CONDITION = -2; static final int CONDITION = -2; Static final int PROPAGATE = -3; static final int PROPAGATE = -3; //CountDownLatch is used as a volatile int waitStatus; // The status of the thread in the Node. The AQS queue defaults to 0. // The previous Node of the current Node volatile Node next; // The node following the current node is volatile Thread Thread; // Thread information encapsulated by the current Node Node nextWaiter; Final Boolean isShared() {return nextWaiter == SHARED; } Final Node predecessor() throws NullPointerException {// Obtain the last Node of the current Node Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} Node(Thread Thread, Node mode) {// Create a Node: waitStatus = 0; this.thread = thread; } Node(Thread Thread, int waitStatus) {Condition will use this.waitStatus = waitStatus; this.thread = thread; }}Copy the code

It should be noted that the AQS queue will not pass in the state when the Node Node is initialized, so the default value is 0. As we know from the previous analysis, it will be changed to 1 in the middle of the process, and 3 will appear when the thread is abnormal. Therefore, the Node Node in the AQS queue actually only has three states: -1,0,1, while the Condition queue, When initialized, another constructor is called, passing in the -2 state directly, so there is no default 0 state, and the Condition queue only has -2 and 1 states.

If the Condition queue is not initialized, thread A will execute 1879 and 1882. If the Condition queue is not initialized, thread A will execute 1879 and 1882. If the Condition queue is initialized, thread A will execute 1879 and 1882.



There is only one node, so firstWaiter and lastWaiter refer to the same node, and nextWaiter is empty in ThreadA because there is no next node.

ThreadB will execute two lines of code, 1881 and 1882, resulting in the following Condition queue:



The Condition is waiters constructed, so let’s say the Node state of ThreadB runs like 1, and run the unlinkCancelledWaiters() method to check how the Node is not valid when its state is 1. The Condition queue looks like this (the only difference is that the Node state of ThreadB changes to 1) :

AQS#unlinkCancelledWaiters()

The logic of this method is not too hard, just remember two properties:

The first is “firstwaiter”. After the loop, nextWaiter is assigned to t to continue the loop (1933 and 1945).

The other is trail, which is used to record the nodes that have been cycled. If there is no canceled node during the cycle, it is to assign the value of T to trail after the cycle is completed, and then continue the cycle



The first loop will definitely have 1944 and 1945 lines because the firstWaiter is not empty and the state is equal to Node.CONDITION, and the loop will end with the following result: T = ThreadB, trail = firstWaiter;

At this time, because the state of t is 1, so the if condition is valid, enter line 1935 to execute the logic of clearing invalid nodes, t. nextwaiter = null; Since ThreadB is currently the tail node, this statement does not work in this case, but only for non-tail nodes.

Trail. NextWaiter = null; Trail. NextWaiter = null; trail. Since trail=firstWaiter, this is equivalent to firstWaiter. NextWaiter =null, and we get the following new Condition queue:



Then execute lastWaiter = trail; Equivalent to lastWaiter = firster; The following Condition queue is obtained:



You can see that the invalid ThreadB node has been cleared.

Forget about clearing the invalid node logic and go back to our normal logic. After the queue is built, the await() method will continue:



The logic to release the lock fullyRelease(Node) is then returned. Since the thread await() method is intended to give the lock to another thread, it must release the lock, otherwise no other thread can acquire the lock.

AQS#fullyRelease(Node)



We get the current state and pass it to elease(). When we introduced ReentrantLock, lock.unlock() also calls this release(arg), but unlock() is a fixed 1. So if there’s a reentrant call it’s only going to be state minus 1 once, and here it’s just going to be subtracted.

I’m not going to talk about the Release (ARG) method here, but if you don’t know anything about it, you can see it in front of meArticles introducing ReentrantLock and AQS.

Here we go back to our await() method if the lock is released successfully:

The isOnSyncQueue method in the while loop is used to determine whether the Node where the current thread is located is in the AQS synchronization queue.

Attention, this is in concurrent scenarios, so there may be other threads also already wake the thread B, after wake up is not to say that can direct access to the lock, but for the lock, then failed for lock will join the AQS synchronous queue, so here want to have this judgment, if not in AQS synchronous queue, then you can hang the current thread.

AQS#isOnSyncQueue(Node)



One particular point to note here is that a node in the Condition queue, when awakened by another thread calling singal(), has to scramble for the lock, and if the lock fails, it may be added to the AQS synchronization queue, hence the prev and Next attributes

There is also a point if you do not remember before the construction of AQS synchronization queue logic may not be very easy to understand, in order to facilitate everyone to understand, I introduce the ABOVE AQS synchronization queue ENQ code fragment posted to explain it is very easy to understand:



If line 597 succeeds and line 598 CAS fails, node.prev! =null, but he failed to replace the tail node, so he did not join the AQS synchronization queue, so even node.prev! =null, which still needs to be traversed from the tail node to determine.

AQS#findNodeFromTail(Node)



This code should be easy to understand, so I won’t have to explain it too much.

Back to await() main method:

At this point, our thread B will definitely not be in the AQS sync queue when it comes in. Search will go to the next line, and the current thread will be suspended by Park (). The suspension needs to wait until another thread calls singal() to wake it up.

Condition. Signal (

The above example, when we call condition. The signal (), we enter the AbstractQueuedSynchronizer signal in class () method.

AQS#signal()



This method is relatively simple, just to make a simple judgment, let’s go into the doSignal(Node) method to see how to wake up other threads.

AQS#doSignal(Node)



The body of the loop determines whether the second node in the current Condition queue is available, and if so, it is removed.

The main logic lies in the transferForSignal(Node) in the while Condition, which is the core code of the Singal operation. The main logic is to transfer the Node in the Condition queue to the AQS synchronization queue to compete for the lock.

The firstWaiter Node has been removed from the AQ queue after the thread is woken up. The Condition is removed from the Node and the AQS queue is joined by the transferForSignal method:

Note that a thread awakened by SIGAL does not acquire the lock directly, but must compete to acquire the lock, so it needs to be moved to the AQS synchronization queue to compete for the lock.

AQS#transferForSignal(Node)



Lock (), lock(), lock(), lock(), lock(), lock(), lock(), lock(), lock(), lock()) Do considered here is one of can improve performance, we assume that there is only one original of this AQS synchronous queue node (in addition to head the sentinel node), so this time p (i.e., the original tail) node is invalid, this time to wake up the current node to preempt the lock, and this time before the thread holding the lock happen to release the lock, Then he might have just succeeded.

Back to AQS# await ()



Above our thread is suspended on line 2062, but note that there are two ways to wake up:

  • Awakened by the singal() method
  • So the first thing you need to do after waking up is decide whether you’re being woken up by interrupt() or singal().

AQS#checkInterruptWhileWaiting(Node)



TransferAfterCancelledWait (Node) method is mainly is to determine what is 2 or 3.

AQS#checkInterruptWhileWaiting(Node)



Above we can see whether thread recovery starts with interrupt() or singal() and then returns to the previous method

Go back to AQS#await()



AcquireQueued acquireQueued returns true if the lock is interrupted and interruptMode=0 if the lock is interrupted We need to change it to REINTERRUPT. After that, the reportInterruptAfterWait method is simple enough to clear canceled nodes and respond to interrupts based on interruptMode:

conclusion

The nodes in Condition queue and AQS synchronization queue share Node objects, which are distinguished by different states. A Node can only exist in one queue at a time. The flow chart of a Node moving from Condition queue to AQS synchronization queue is as follows:



I will continue to analyze the implementation principle of other tools in JUC

** Please pay attention to me, learn and progress together **