preface
In the previous article, I mentioned, the locking mechanism of the Lock interface implementation under AQS (AbstractQueuedSynchronizer, below are called AQS) and the Condition is the true implementers. Condition also plays a very important role in the basic framework of the whole synchronous component. Since it is so important and sharp, now let’s go to understand its internal practical principles and concrete logic.
Before reading this article, I sincerely recommend reading AQS and LockSupport tools for Concurrent Java Programming. Because the internal mechanism and logic of the whole Condtion are inseparable from the knowledge points mentioned in the above two articles.
Condition interface methods are introduced
Before we formally introduce Condtion, we can look at the declared methods. The specific method declaration is shown in the following table:
From this table, we can see that it internally defines two types of methods: wait(series of methods starting with await) and notification (series of methods starting with singal), which are similar to wait() and notify()/NotifyAll() of Object objects to block and wake up threads.
ConditionObject introduction
In practice, the Condition interface implementation class is ConditionObject, an internal class in AQS. It maintains a FIFO(first in first out) queue (we call it a waiting queue, you can also call it a blocking queue, depending on what you think), which controls the threads that fetch shared resources by working with synchronous queues in the AQS.
Waiting queue
ConditionObjec is a first in first out (FIFO) queue in ConditionObjec. Each node in the queue contains a thread reference, which is the thread blocking ConditionObject. Note that the nodes in the wait queue reuse the definition of Node class in AQS. In other words, the sync queue maintained in AQS and the wait queue maintained in ConditionObjec are both of the aqs.node type. Introduction (about AbstractQueuedSynchronizer. Node class, you can see “AQS of the locking mechanism of concurrent Java programming” the description in the article).
ConditionObject also defines firstWaiter and lastWaiter as Pointers to the head and tail of the queue. After the actual thread calls its series of methods beginning with await. Is constructed as a Node Node. Add the tail of the wait queue. The basic structure of the wait queue is as follows:
The way to add nodes to the wait queue is also simple, with nextWaiter pointing to the newly added node and lastWaiter pointing to the newly added node.
Mapping between synchronization queues and wait queues
As mentioned above, the Lock mechanism requires the synchronization queue in AQS to work with the wait queue in ConditionObject. The corresponding relationship is shown below:
Under the Lock locking mechanism, we can have one synchronization queue and multiple wait queues, as opposed to our traditional Object monitor model, where one Object has one synchronization queue and multiple wait queues. A lock in a LOCK can have multiple conditions.
The basic use of Condition
To better understand the relationship between synchronous queue and wait queue. This class is a special queue. When the queue is empty, the fetch operation will block the current fetch thread until there are new elements in the queue. When the queue is full, the put operation will block the put thread. Until there is a vacancy in the queue. The specific code is as follows:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
// The total number of objects in the array
int putptr, takeptr, count;
/** * Add an element * (1) If the current array is full, put the current "into" thread, join the "into" wait queue, and block the current thread * (2) If the current array is not full, put the x element into the array, and wake up the waiting thread in the "get" thread. * /
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)// If it is full, it blocks the current "put" thread
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();// Wake up the "get" thread
} finally{ lock.unlock(); }}/** * take an element * (1) if the current array is empty, add the current "get" thread to the "get" wait queue, and block the current thread * (2) If the current array is not empty, put the "wake" thread into the wait queue. * /
public Object take(a) throws InterruptedException {
lock.lock();
try {
while (count == 0)// If null, block the current "fetch" thread
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();// Wake up the "put" thread
return x;
} finally{ lock.unlock(); }}}Copy the code
As you can see from the code, in this class we create two wait queues, notFull and notEmpty. The two wait queues are used, respectively, notFull for blocking “put” threads and notEmpty for blocking “get” threads when the requests array is full. Note that a Condition must be obtained through the Lock newCondition() method. We’ll cover ReentrantLock in a future article.
Block implementation await()
Now that you know the basic structure of ConditionObject and its relation to the internal synchronization queue in AQS, let’s take a look at its blocking implementation. Calling the await() method of ConditionObject (or the method beginning with await) causes the current thread to queue up and release the synchronization state. Note that the current thread must have obtained the synchronization state when the method returns. The thread will return from the await () method and will join the synchronization queue if it wakes up. For this example, we use the awati() method as follows:
public final void await(a) throws InterruptedException {
// If the current thread is interrupted, throw an exception
if (Thread.interrupted())
throw new InterruptedException();
(1) Add the current thread to the queue
Node node = addConditionWaiter();
(2) Release the synchronization state (i.e., release the lock), remove the thread node from the synchronization queue, and wake up the next node in the synchronization queue
int savedState = fullyRelease(node);
int interruptMode = 0;
//(3) Check whether the current thread node is still in the synchronization queue, if not, block the thread
while(! isOnSyncQueue(node)) { LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
//(4) When a thread is awakened, it recompets with other threads in the synchronization queue to obtain the synchronization state
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
Copy the code
From the overall code, the whole method is divided into the following four steps:
- (1) Pass
addConditionWaiter()
Method adds a thread node to the wait queue. - (2) Pass
fullyRelease(Node node)
Method to release the synchronization state (that is, release the lock) while removing the thread node fromSynchronous queue
To remove and wake upSynchronize the next node in the queue
. - (3) Pass
isOnSyncQueue(Node node)
Method to determine whether the current thread node is presentSynchronous queue
If no, passLockSupport.park(this);
Blocks the current thread. - (4) Call when the thread is woken up
acquireQueued(node, savedState)
Method to re-compete with other threads in the synchronization queue to obtain synchronization status
Because the logic involved in each step is a little complicated, the methods involved in the above four steps are introduced separately for your understanding.
AddConditionWaiter () method
The addConditionWaiter() method adds a thread node to the queue that needs to be blocked.
private Node addConditionWaiter(a) {
Node t = lastWaiter;
// (1) If the thread in the current tail node has been interrupted,
// Remove all thread nodes in the wait queue that have been interrupted or released from the synchronization state
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }//(2) Build the node in the wait queue
Node node = new Node(Node.CONDITION);
//(3) add the thread node to the queue, and create a pointer to firstWaiter and lastWaiter
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
Copy the code
The logic of this method is also relatively simple, which is divided into the following three steps:
- (1) Get the tail node in the waiting queue. If the current tail node has been interrupted, then pass
unlinkCancelledWaiters()
Method to remove all waiting queuesHave been cut
orThe synchronization state has been released (that is, the lock has been released)
Thread node of - (2) Construct the nodes in the wait queue
New
Is not the same as the thread node in the synchronization queue. (Node state enumeration is not clear, you can seeAQS for Java concurrent programming locking mechanismEnumerations of Node states are introduced in this article. - (3) Add the thread node to the wait queue and set the pointer to firstWaiter and lastWaiter
FIFO(first in first out )
Add thread nodes in the form of.
UnlinkCancelledWaiters () method
Let’s look at an implementation of the addWaiter () method as we waiters all the interrupted thread nodes away from the unlinkCancelledWaiters () call in step 1 of the addWaiter () method. As follows:
private void unlinkCancelledWaiters(a) {
// Get the head node in the wait queue
Node t = firstWaiter;
Node trail = null;
// Traverse the wait queue to remove the interrupted thread node from the wait queue.
while(t ! =null) {
Node next = t.nextWaiter;
if(t.waitStatus ! = Node.CONDITION) { t.nextWaiter =null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)// Redefine the pointer to lastWaiter
lastWaiter = trail;
}
elsetrail = t; t = next; }}Copy the code
The specific process of this method is shown in the figure below:
fullyRelease(Node node)
After a blocking thread is added to the wait queue, the thread node is removed from the synchronization queue, releasing the synchronization state (that is, releasing the lock), and waking up the next node in the synchronization queue. The specific code is as follows:
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throwt; }}Copy the code
The release(int arg) method releases the synchronization state of the current thread and wakes up the next thread node in the synchronization queue to try to get the synchronization state, Because this method has already been analyzed under the unparkSuccessorNode node method in the AQS article on Locking mechanisms for Concurrent Programming in Java, it will not be analyzed here. I hope you can refer to the article mentioned above for understanding.
isOnSyncQueue(Node node)
This method is mainly used to determine whether the current thread node is in the synchronization queue. The specific code is as follows:
final boolean isOnSyncQueue(Node node) {
WaitStatus == node. CONDITION or if one of the nodes on the current Node is empty, it is not in the synchronization queue
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// If the current node has the next node, it is in the synchronization queue.
if(node.next ! =null) // If has successor, it must be on queue
return true;
// If none of the above conditions are met, the synchronization queue is traversed. Check if it is in a synchronization queue.
return findNodeFromTail(node);
}
Copy the code
If you remember the AQS, you should know that the Node in the AQS queue uses the pre and Next fields. If the Node in the AQS queue uses the nextWaiter field, it is easy to determine whether the Node is in the AQS queue by whether the nextWaiter field is ==null. Of course, there could be a special case. It is possible that the thread node that needs to block has not been added to the synchronization queue, so we need to traverse the synchronization queue to determine whether the thread node already exists. The specific code is as follows:
private boolean findNodeFromTail(Node node) {
for (Node p = tail;;) {
if (p == node)
return true;
if (p == null)
return false; p = p.prev; }}Copy the code
If node.netx! =null, the current thread is already in the synchronization queue. The case we need to deal with is definitely for node.next==null. So you need to traverse from the last node.
acquireQueued(final Node node, int arg)
This method is called to join the thread node to the synchronization queue when the thread is woken up (specifically because it returns from the await () method only after it has used signal() and other methods). This method is described in detail in AQS for Java Concurrent Programming locking Mechanisms. I’m not going to do too much parsing here.
Blocking the process
Now that we understand the process of blocking, let’s summarize the process of blocking. The specific process is shown in the figure below:
- (1) Remove the thread node from the synchronization queue and release its synchronization state.
- (2) Construct new blocking nodes and join the waiting queue.
Wake up to implement signal()
When it is time to wake up a thread, the series of methods starting with Singal in ConditionObject are called, which wakes up the first thread node in the wait queue and moves it to the synchronization queue before waking it up. Here we take the singal() method as an example to explain, the specific code is as follows:
public final void signal(a) {
(1) Check whether the current thread has acquired the synchronization state (i.e. lock).
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
// (2) get the first node in the wait queue, then move it to the synchronization queue, then wake up the thread node
if(first ! =null)
doSignal(first);
}
Copy the code
The main logic of this method is divided into the following two steps:
- (1) Pass
isHeldExclusively()
Method to determine whether the current thread has acquired the synchronization state (that is, the lock). - (2) Pass
doSignal(Node first)
Method to get the first node in the wait queue, then move it to the synchronization queue, and then wake up the thread node.
The two methods mentioned above are described below.
IsHeldExclusively () method
The isHeldExclusively() method is a method in AQS, which is implemented by default by its subclasses. It is mainly used to determine whether the thread currently calling singal() is in the synchronization queue and has obtained the synchronization status. The specific code is as follows:
protected boolean isHeldExclusively(a) {
throw new UnsupportedOperationException();
}
Copy the code
DoSignal (first) method
DoSignal (Node First); doSignal(Node first);
private void doSignal(Node first) {
do{// (1) Remove the first node in the wait queue from the wait queue and respecify the pointing of the firstWaiterif ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while(! TransferForSignal (first) && // (2) Adds the first node in the waiting queue to the synchronization queue and wakes up the first node (first = firstWaiter)! = null); }Copy the code
This method is also very simple, divided into two steps:
- (1) Remove the first node in the wait queue from the wait queue and set the firstWaiter to point to the next node of the first node. To make it easier for you to understand the logic described in this step, a concrete diagram is drawn here, as shown below:
- (2) Pass
transferForSignal(Node node)
Method adds the first node in the wait queue to the synchronization queue, and then wakes up the thread node.
TransferForSignal (Node to Node) method
The transferForSignal(Node Node) method in Step 2 is complicated, so this method is explained in detail. The specific code is as follows:
final boolean transferForSignal(Node node) {
// (1) Set the state of the thread node to its initial state. If it fails, the current thread has been interrupted
if(! node.compareAndSetWaitStatus(Node.CONDITION,0))
return false;
// (2) put this node into the synchronization queue,
Node p = enq(node);
int ws = p.waitStatus;
// (3) Get the current node state and judge, try to set the thread node state to Singal, if failed, wake up the thread
if (ws > 0| |! p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code
The method is divided into three steps:
- (1) Set the state of the thread node to the initial state, if the failure means that the current thread has been interrupted, directly return.
- (2) Pass
enq(Node node)
Method to put the thread node intoSynchronous queue
In the. - (3) After putting the thread node into the synchronization queue, obtain the status of the current node and judge if the node’s
waitStatus>0
Or bycompareAndSetWaitStatus(ws, Node.SIGNAL)
Set the state of this node to Singal and pass on failureLockSupport.unpark(node.thread)
Wake up the thread.
LockSupport. UnPark (Thread Thread) LockSupport tool for Concurrent Programming Enq (Node Node) The specific code is as follows:
private Node enq(Node node) {
for (;;) {
//(1) Get the last node of the synchronization queue
Node oldTail = tail;
//(2) If the tail node is not empty, the thread node is added to the synchronization queue
if(oldTail ! =null) {
// Point the prev of the current node to the tail node
U.putObject(node, Node.PREV, oldTail);
// Synchronizes the tail pointer in the queue to the current node
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
returnoldTail; }}else {
//(3) If the current synchronization queue is empty, the synchronization queue is constructedinitializeSyncQueue(); }}}Copy the code
Looking at the method, we see that the method adds the node to the synchronization queue by means of an infinite loop (which you can also call a spin). The method is divided into the following steps:
- (1) Obtain the tail node of the synchronization queue
- (2) If the tail node is not empty, the thread node is added to the synchronization queue
- (3) If the current synchronization queue is empty, pass
initializeSyncQueue();
Construct the synchronization queue.
Step 2 of Node enq(Node Node) Let’s look at calling u.putobject (node, node.prev, oldTail); Statement, which internally points the prev of the current node to the last node. In AQS (AbstractQueuedSynchronizer) Node in the class has the following static variables and statement. I’ve omitted some of the non-essential code here. The specific code is as follows:
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
// Omit some code
static final long PREV;
static {
try {
// Omit some code
PREV = U.objectFieldOffset
(Node.class.getDeclaredField("prev"));
} catch (ReflectiveOperationException e) {
throw newError(e); }}}Copy the code
One Node. Class. GetDeclaredField (” prev “) statement is easy to understand, is access to the Node class pre Field, if there is a corresponding Field Field returns, whereas thrown NoSuchFieldException anomalies. I described a similar situation with the objectFieldOffset(Field F) method in Unfase in the LockSupport Tool for Java Concurrent Programming. Let me explain it briefly. This method is used to obtain the offset of a field relative to the “starting address” of a Java object. This means that each field is stored in the corresponding memory of the class with a “corner”, which means that our static PREV variable represents the “corner” of the Node PREV field.
When we get the corner tag, we pass u.tobject (node, node.prev, oldTail); The first parameter is the operation object, the second parameter is the memory “horn” of the operation, and the third parameter is the expected value. Finally, we are done pointing the prev field of the current node to the last node of the synchronization queue.
Once you understand this, it is easy to understand the tail pointer in the synchronization queue to the current node and to construct the synchronization queue if the current synchronization queue is empty. I won’t describe it here for lack of space. I hope readers can read the source code and draw inferences. The code for these two methods looks like this:
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long HEAD;
private static final long TAIL;
static {
try {
STATE = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
HEAD = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
TAIL = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
} catch (ReflectiveOperationException e) {
throw newError(e); } Class<? > ensureLoaded = LockSupport.class; }private final void initializeSyncQueue(a) {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}
private final boolean compareAndSetTail(Node expect, Node update) {
return U.compareAndSwapObject(this, TAIL, expect, update);
}
Copy the code
Wake up the process
Now that we understand the specific logic of wake up, let’s summarize the specific flow of wake up. The details are shown in the figure below:
- Will wait in the queue
head
Node thread, moved to the synchronization queue. - When moved to the synchronization queue. Wake up the thread. Let the thread compete for synchronized state.
The whole process is not too complicated, just note that we wake up the thread after we add the thread node in the wait queue to the synchronization queue.
The last
This article refers to the following book, standing on the shoulders of giants. You can see further.
- The Art of Concurrent Programming in Java
Recommended reading
- Introduction to Java concurrent programming locking mechanism
- AQS for Concurrent Programming in Java
- LockSupport for Concurrent Java Programming