Preface:
Before you can understand the source code of Condition, you need to have some knowledge of the source code of AQS, because Condition’s functions are based on AQS, which was analyzed in my last article.
The basic use of Condition
Condition is a multithreaded coordinated communication utility class that allows threads to wait together for a Condition (Condition) that will wake up only if the Condition is met.
- Case presentation
When the await method is called, the current thread releases the lock and waits, while the other thread calls the condition’s signal or SignalAll method to notify the blocked thread, and then releases the lock itself. The awakened thread acquires the previous lock and continues to execute, and finally releases the lock.
So, the two most important methods in condition are await and signal
- Await: block and suspend the current thread
- Signal: wakes up a blocked thread
Condition is similar to synchronized’s wait/notify, so we can see that Condition is the class J.U.C. uses to implement active blocking and wake up for threads.
Condition source code analysis
- Take a look at the structure of ConditionObject. There are only two properties that make up a one-way linked list
- FirstWaiter points to the header in the queue
- LastWaiter points to the last node in the queue
- ConditionObject is an inner class of AQS, so it automatically has a reference to the outer class because it was created from Sync in the ReentrantLock object, which inherits from the AQS class. So he shares an AQS queue with the ReentrantLock object.
Condition is called first, which requires the Lock Lock, so it means there will be an AQS synchronization queue. In the above case, two threads are running at the same time, so the AQS queue might look like this.The condition.await() method is then called
- Awai () source code analysis
Calling the await() method of Condition (or a method beginning with await) causes the current thread to queue and release the lock while the thread state changes to wait. When returned from await(), the current thread must have acquired the lock associated with Condition
public final void await(a) throws InterruptedException {
// await is allowed to be interrupted
if (Thread.interrupted())
throw new InterruptedException();
// Create a new node in condition with the same data structure as the linked list
Node node = addConditionWaiter();
// Release the current lock, get the lock status, and wake up a thread in the AQS queue
long savedState = fullyRelease(node);
// If the current node is not on the synchronization queue, that is, has not been signalled, the current thread will be blocked
int interruptMode = 0;
// Check whether this node is on the AQS queue. The first check is false because the lock has been released
while(! isOnSyncQueue(node)) {// Block the thread
LockSupport.park(this);
// When the thread wakes up, it will determine whether it was awakened by Single or by an interrupt operation
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
If wakened by single, the loop ends. If wakened by interrupt, the loop continues to block and adds to the AQS queue
break;
}
When the thread wakes up, it will attempt to acquire the lock, and acquireQueued returns false to acquire the lock.
// interruptMode ! = THROW_IE -> Indicates that the thread failed to enqueue node, but signal did enq it.
// Set this variable to REINTERRUPT.
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// If the next waits for node are not null, the Condition queue is cleaned up.
// If null, there is nothing left to clean up.
if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
If the thread is interrupted, an exception needs to be thrown. Or do nothing
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
Copy the code
- The first and most important method is addConditionWaiter(), which wraps the current thread as a Node and adds it to the Condition queue. The queue here is no longer a two-way list, but a one-way list.
private Node addConditionWaiter(a) {
// Get the Node to which lastWaiter points
Node t = lastWaiter;
// If lastWaiter is not empty && the state of the node is not CONDITION, delete the node pointed to by lastWaiter and retrieve the node pointed to by lastWaiter again
if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// Delete the node pointed to by lastWaiter and update the node pointed to by lastWaiter
unlinkCancelledWaiters();
t = lastWaiter;
}
// Create a Node for the current thread and set its state to CONDITION(-2).
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// If lastWaiter points to an empty Node,firstWaiter points to the Node of the current thread
if (t == null)
firstWaiter = node;
// If the lastWaiter is not updated for empty, it points to the Node of the current thread
else
t.nextWaiter = node;
// Update the node pointed to by lastWaiter
lastWaiter = node;
// Returns the node of the current thread
return node;
}
Copy the code
After executing the addConditionWaiter method, one of these condition queues is created
- FullRelease is a complete release of the lock, so if the lock has multiple reentrants, you only need to release it once to get all the reentrants back to zero.
final long fullyRelease(Node node) {
// Define the failed flag
boolean failed = true;
try {
// Get the number of reentries
long savedState = getState();
// Release the lock and wake up the next blocked thread in the synchronization queue
if (release(savedState)) {
// Modify the failed flag
failed = false;
// Returns the number of reentries
return savedState;
} else {
throw newIllegalMonitorStateException(); }}finally {
// If the thread fails to be awakened, change the status of the current thread node to CANCELLED(1)
if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code
- At this point, synchronization queues trigger lock release and re-contention. ThreadB acquires the lock.
- IsOnSyncQueue () is a bit of a hassle
final boolean isOnSyncQueue(Node node) {
/ / whether the current thread state of node to CONDITION | | his former a node is null
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// Determine his next node
if(node.next ! =null)
return true;
// Returns true if the current thread's node was found in the AQS queue, false otherwise
return findNodeFromTail(node);
}
Copy the code
Check whether the current node is in the synchronization queue. Return false to indicate that it is not, and true to indicate that it is
If it is not in the AQS synchronization queue, the current node is not waking up to claim the synchronization lock, so the current thread needs to be blocked until another thread calls signal to wake up.
If the queue is synchronized on the AQS, it means that it needs to compete for synchronization locks to obtain execution permission for the program
Why make this judgment? The reason is that nodes in the condition queue rejoin the AQS queue to compete for locks. When signal is called, the current node is moved from the condition queue to the AQS queue
Check whether ThreadA exists in the AQS queue
- If ThreadA’s waitStatus is CONDITION, it exists in the CONDITION queue, not the AQS queue. Because the AQS queue must not have CONDITION.
- If node.prev is null, it does not exist in the AQS queue either, because prev=null in the AQS queue, there is only one possibility, is that it is the head node, head node means that it is the node that acquired the lock.
- If node.next is not empty, it must be in the AQS queue, because only AQS queues have the relationship between next and prev.
- FindNodeFromTail: scans the AQS queue from the tail node. If the node in the AQS queue is found to be equal to the current node, the node must exist in the AQS queue.
- ThreadA is blocked and threadB has acquired the lock. Signal () is called
public final void signal(a) {
// Determine whether the current thread has acquired the lock
if(! isHeldExclusively())throw new IllegalMonitorStateException();
// Get the firstWaiter node
Node first = firstWaiter;
if(first ! =null)
// If not empty, the thread represented by the first node is added to the AQS queue
doSignal(first);
}
Copy the code
- DoSignal () method: For the first condition node starting from the head of the condition queue, the transferForSignal operation is executed to convert the node from the condition queue to the AQS queue, and at the same time modify the state of the original tail node in the AQS queue.
private void doSignal(Node first) {
do {
// Remove the first node from the condition queue and determine whether the condition queue is empty
if ( (firstWaiter = first.nextWaiter) == null)
// Set the lastWaiter node to null
lastWaiter = null;
// Set first's nextWaiter property to null
first.nextWaiter = null;
// Add to AQS queue
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code
- The transferForSignal() method, in which CAS modifies the node state and, if successful, places the node in the AQS queue and wakes up the thread on the node. At this point, the node wakes up in the await method
final boolean transferForSignal(Node node) {
// The status of the updated node is 0, and if the update fails, there is only one possibility for the node to be CANCELLED
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
// Call enq() to add the current node to the AQS queue
Node p = enq(node);
// Get the state of the current thread node
int ws = p.waitStatus;
// If the state of the previous node is cancelled, or an attempt to set the state of the previous node to SIGNAL fails (SIGNAL means that its next node needs to stop blocking)
// If successfully added to the AQS queue, return true, or false to end the loop
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL))// Wake up the thread
LockSupport.unpark(node.thread);
return true;
}
Copy the code
- Graphic analysis
If the status of ThreadA’s preV is greater than 0, or if the SIGNAL fails to be set, Indicates that the node was set to CANCELLED. This will wake up the ThreadA thread. Otherwise, the AQS queue mechanism is used to wake ThreadA up after ThreadB releases the lock.
- CheckInterruptWhileWaiting () method, the front on the analysis of await approach, the thread will be blocked. By being awakened and then continue to return to the last execution code checkInterruptWhileWaiting this method is why? The condition queue is blocked, and the condition queue is blocked
private int checkInterruptWhileWaiting(Node node) {
// Determine whether the interrupt method was called before waking up the thread
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
Copy the code
-
If the current thread is interrupted, then call transferAfterCancelledWait method judging subsequent processing should be throw InterruptedException or interrupted again.
-
TransferAfterCancelledWait () method
final boolean transferAfterCancelledWait(Node node) {
// Use cas to change the node status. If the change succeeds, signal has not been called when the thread is interrupted.
// The thread is woken up not necessarily by locksupport.unpark at the Java level, but by calling interrupt(), which updates an interrupt flag and wakes up the blocked thread.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
// If cas succeeds, node is added to the AQS queue
return true;
}
// If cas fails, check whether the current node is already on the AQS queue. If not, let another thread execute
// When node triggers signal, node is added to the AQS queue
while(! isOnSyncQueue(node))// Loop to check whether node has been successfully added to AQS queue. If not, the CPU time slice is actively released by yield,
Thread.yield();
return false;
}
Copy the code
Note that if the CAS fails the first time, it is not possible to determine whether the current thread aborted or called signal first. It could be either aborted or aborted before CAS. At this moment need to do is to wait for the current thread after the node is added to the AQS queue, namely enq after the method returns, return false to tell checkInterruptWhileWaiting method returns REINTERRUPT (1), the follow-up to interrupt.
In simple terms, the return value of this method represents whether the current thread was awakened by an interrupt at park. If true, the interrupt was not executed before signal was called. Throw interruptedException when encountering an interrupt while await,
Returns true is to tell checkInterruptWhileWaiting return THROW_IE (1).
If false is returned, signal has already been executed and simply needs to respond to the interrupt again
Summary of await and signal
I represent the whole structure of the previous decomposition diagram by an overall structure diagram. AwaitThread first obtains the lock through lock.lock() method and then calls condition. Await method to enter the wait queue. The other thread, signalThread, calls condition. Signal or signalAll after acquiring the lock successfully, giving the awaitThread the opportunity to move to the synchronization queue. This gives the awaitThread a chance to obtain the lock after another thread releases the lock, so that the awaitThread can exit from the await method to perform further operations. If the awaitThread fails to acquire the lock, it is directly put into the synchronization queue.
- Block: In the await() method, after the thread releases the lock resource, blocks the current thread if the node is not in the AQS wait queue, or spins to wait to try to acquire the lock if it is in the wait queue
- Release: After signal(), the node moves from the Condition queue to the AQS wait queue and then enters the normal lock acquisition process