Attribute analysis
Inside the Condition is a Condition Queue, which is a single-linked list associated with Node nextWaiter. The following are the head and tail nodes. Node is the same as the total WaitQueue Node type.
private transient Node firstWaiter;
private transient Node lastWaiter;
// The following two are interrupt modes.
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
Copy the code
Analysis of construction method
There’s nothing to say about a no-parameter construction.
public ConditionObject(a) {}Copy the code
Analysis of common methods
signal
Wake up
Wakes one, transferring the head node in the Condition Queue to the WaitQueue
public final void signal(a) {
// Check whether the thread currently holding the error is the current thread
if(! isHeldExclusively())throw new IllegalMonitorStateException();
// get the firstWaiter in the condition queue
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
Copy the code
doSignal
To move a node from the Condition queue to the Wait queue, starting with the primary node. If the status of the current node changes during this period, so that the type of the node is not Condition, the Condition queue will move the pointer back.
private void doSignal(Node first) {
do {
// If the header is null, then the queue is empty, so lastWaiter is null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// Clear the reference relationship
first.nextWaiter = null;
// Move first from condition queue to waitQueue.
// If not, assign first to the header. Note that firstwaiter has already been moved later.
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code
AQS#transferForSignal
Move node from condition Queue to WaitQueue
Change the Node waitStatus from CONDITION to 0, because the initial state of the Node inserted into the WaitQueue must be 0.
When a node is added to the waitQueue through the ENQ method of AQS, the enQ method returns the precursor node that the current node is inserted into the waitQueue.
This then changes the waitStatus of the precursor node. If the state is CANCELLED or fails to change the precursor node, the thread of the current node is directly woken up.
final boolean transferForSignal(Node node) {
// Change the state to 0, or return false if it was not CONDITION before.
// Normally, the node that calls this method is CONDITION. CONDITION is set to 0 in order to get the waitQueue operation later.
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
Enq enq enqueues this node to the WaitQueue.
// This method returns the precursor node of the currently inserted node.
// When the waitQueue is not initialized, the node is the header node. It's going to put the Condition queue in the waitQueue, because it's in the waitQueue
// Set WaitStatus to 0.
Node p = enq(node);
// Get the first node waitStatus
int ws = p.waitStatus;
// ws > 0: the node status is canceled
/ /! CompareAndSetWaitStatus (p, ws, node. SIGNAL) : change the status of the head Node to SIGNAL,
// If the state of the precursor Node is cancelled, or the state of the precursor Node is changed to Node.SIGNAL, the thread of the current Node is woken up.
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL))// Wake up the park thread.
// the state of the firstWaiter has changed because the firstWaiter has already been set by another thread
LockSupport.unpark(node.thread);
return true;
}
Copy the code
signalAll
Wakes up all
Move all the nodes in the Condition Queue to the WaitQueue
public final void signalAll(a) {
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignalAll(first);
}
Copy the code
doSignalAll
Wake up all the nodes
Move all the nodes in the Condition Queue to the WaitQueue.
The transferForSignal method has been analyzed above
private void doSignalAll(Node first) {
// Set the first and last nodes of the list to null
lastWaiter = firstWaiter = null;
// Start iterating through the loop. In general, first is the head node
do {
// Get the next node
Node next = first.nextWaiter;
// Clear the reference relationship
first.nextWaiter = null;
//
transferForSignal(first);
first = next;
} while(first ! =null);
}
Copy the code
await
Waiting for the
There are several steps to implement waiting
- Build the Condition Queue.
- Releases the lock.
- Park thread, waiting to wake up.
- After wake up, handle interrupts if there are interrupts, otherwise everything is fine.
This is the main method, where the code logic should be combined with the signal. To make it easier to understand, draw a diagram.
Here’s a picture of the code below, and then how the while loop is combined with signal.
How do we combine await and signal?
-
The first thing to know is that there are two queues, a waitQueue (which is used as a lock) and a Condition Queue (which is used as a Condition). Different conditions are different Condition queues.
-
The await operation builds the node and adds it to the Condition Queue. The node is not added to the waitQueue until there is no signal. So the isOnSyncQueue in the while loop is in CONDITION before signal, and it is not in a Wait Queue, so it keeps returning false.
-
At signal time, both signal and signalAll (which are the difference between one and multiple) unbind nodes from the Condition Queue and add them to the Wait Queue. It also changes the WaitStatus of the current node to 0 and the status of the current node’s precursor in the WaitQueue to -1. (Does not include node cancellation).
-
After the above operation, the current node is in the WaitQueue, so it goes through the normal lock set. If the precursor wakes up the current node after releasing the lock, the current node will wake up in the while. Of course, there is one step missing to determine whether there is an interruption during park. If the while condition is not met, proceed with the following code (obtain the lock, determine the interrupt pattern)
Know that Park can respond to interrupts, and interrupts do not report errors, just change the interrupt flag.
public final void await(a) throws InterruptedException {
// Determine whether the current thread is interrupted, if interrupted, directly throw an exception
if (Thread.interrupted())
throw new InterruptedException();
ConditionQueue (ConditionQueue
Node node = addConditionWaiter();
// Release the currently held lock
int savedState = fullyRelease(node);
// Interrupt mode defaults
// This is used to judge interrupts. Different interrupts have different modes
int interruptMode = 0;
// If there is a signal operation,
// If you wake it up, you can't enter it.
// Because the current node is already in the waitQueue,
SignalAll will move all the nodes in the Condition Queue to the WaitQueue and will change the Condition Queue to the WaitQueue
// Change waitStatus of the node to 0 before changing its precursor to -1.
// Therefore, the current node must not be in the waitQUeue until ignal is called.
while(! isOnSyncQueue(node)) {// If not, park the current thread
LockSupport.park(this);
// As mentioned earlier, the pakr thread can respond to interrupts,
// If everything were normal, we wouldn't be here, and the while loop would exit.
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
// Reacquire the lock and interruptMode! = THROW_IE
// acquireQueued returns the value true. Indicates that the current thread broke while acquiring the lock.
// Return false to indicate that the current thread is not interrupted.
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE)// If lock acquisition is interrupted again and interruptMode is not THROW_IE, reassign to REINTERRUPT.
interruptMode = REINTERRUPT;
ConditionQueue conditionQueue conditionQueue conditionQueue conditionQueue conditionQueue conditionQueue conditionQueue conditionQueue
if(node.nextWaiter ! =null)
// If not
unlinkCancelledWaiters();
// Use interrupt mode to do the operation,
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
Copy the code
addConditionWaiter
Add a new waiter to the Wait Queue
private Node addConditionWaiter(a) {
/ / end nodes
Node t = lastWaiter;
// If lastWaiter is not null and waitStatus of lastWaiter is not CONDITION,
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters();//
t = lastWaiter;
}
// Create a node in CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/ / team
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
Copy the code
unlinkCancelledWaiters
I don’t really understand this, I don’t really understand what this is. Why in addWaiter and Node.nextwaiter! = null, what is the case when todo is supplemented
private void unlinkCancelledWaiters(a) {
/ / head node
Node t = firstWaiter;
Node trail = null;
// Start the loop
while(t ! =null) {
Node next = t.nextWaiter;
// t (on the first loop, that is, the head node) is not the condition
if(t.waitStatus ! = Node.CONDITION) {// Nextwaiter becomes null, eliminating the reference relationship
t.nextWaiter = null;
if (trail == null)
// Change the head node to t and get next (the first loop is the head node's next node)
firstWaiter = next;
else
// Trail is connected
trail.nextWaiter = next;
if (next == null)
// If next is empty, copy trail to lastWaiter
lastWaiter = trail;
}
else
// trail = t
trail = t;
// Move the pointer backt = next; }}Copy the code
AQS#fullyRelease
Call the release method in AQS and pass in the current lock state as an argument. Return state, otherwise an error is reported, and the node must change waitStatus to CANCELLED.
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
If the lock is released, the lock will be returned to its original state
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw newIllegalMonitorStateException(); }}finally {
// If this fails, waitStatus for the current Node will be changed to Node.cancelled
if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code
AQS#isOnSyncQueue
Check whether the current node is in the WaitQueue.
Either the node is in CONDITION, or the node has no precursor (note that the precursor is in WaitQueue). It’s not in the waitQUeue.
If a node has a successor, it is already in the waitQueue
If there is a precursor node but no successor node, it is the tail node, so the next step is to start from the tail node, traversal judgment
final boolean isOnSyncQueue(Node node) {
// If waitStatus is CONDITION or has no precursor (it must not be in the waitQueue)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
If there is a successor node, it must already be in the queue
if(node.next ! =null)
return true;
// If it's not in the waitQueue, or if it's at the end of the waitQueue, and waitStatus is not CONDITION, it's going to go there and search backwards.
return findNodeFromTail(node);
}
Copy the code
AQS#findNodeFromTail
Search backwards, if the node is in a synchronized queue, only isOnSyncQueue. Method can be called.
private boolean findNodeFromTail(Node node) {
// Search from back to front
Node t = tail;
for (;;) {
/ / death cycle
/ / found
if (t == node)
return true;
/ / to find out
if (t == null)
return false;
// The pointer moves forwardt = t.prev; }}Copy the code
checkInterruptWhileWaiting
Check whether the current node is interrupted when it is in park. If not ceased, it returns 0, otherwise to transferAfterCancelledWait judgment.
Return THROW_IE if the excursion is preceded, and REINTERRUPT if the excursion is followed.
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
Copy the code
AQS#transferAfterCancelledWait
Determine whether the currently interrupted thread was interrupted before or after the wake (signalAll or Signal) operation
If the waitStatus of a node before signal is CONDITION, it will change waitStatus to 0 and rejoin the queue.
The node after the signal, whose waitStatus is definitely not CONDITION, will continue to check in the while loop to see if the current node is in the waitQueue. If it is not, the current thread will relinquish CPU usage. Waiting for the signal. Wait for signal and return. This is the interrupt after the wake (signalAll or signal).
final boolean transferAfterCancelledWait(Node node) {
// The node must be in CONDITION before signal. If it is not CONDITION, it is after signal.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// rejoin the team
enq(node);
return true;
}
// check whether the current node is in the queue. If not, loop out CPU usage and wait for a while
while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code
reportInterruptAfterWait
To judge by the interrupt mode identified above, throw an exception or call the interrupt method of the current thread according to the interrupt mode.
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
Copy the code
awaitNanos
Await with timeout
The basic function is the same as await, as is the tryLock (with timeout) implementation of ReentrantLock, with a timeout added to the await method.
A typical timeout is written as the current implementation + timeout = expiration. And then wake up with the expiration time – the current time, if less than 0, it has timed out.
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
// Expiration time
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while(! isOnSyncQueue(node)) {// If the value is less than or equal to 0, awaitNanos does not need to wait, and directly adds the current node to the waitQueue to acquire the lock again
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// Again spinForTimeoutThreshold, familiar feeling
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// Check interrupt
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// this is the same as await
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
// Return the remaining time
return deadline - System.nanoTime();
}
Copy the code
awaitUntil
await(long time, TimeUnit unit)
The ontology is the same as the await for these timeout times. All it does is add a timeout.
awaitUninterruptibly
Await without interrupt, without break
Local still await, but no interrupt mode judgment. Whenever an interrupt occurs (whether in the Condition queue or wait Queue), the current thread’s interrupt method is called.
public final void awaitUninterruptibly(a) {
// Build the node
Node node = addConditionWaiter();
// Release the lock and return the original state
int savedState = fullyRelease(node);
//
boolean interrupted = false;
while(! isOnSyncQueue(node)) {// If the lock is unlocked, the lock must be unlocked
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
// If an interrupt occurs while the lock is being acquired and the current thread is being acquired while in park, call its own interrupt method.
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
Copy the code
hasWaiters
Determine if “waiter” is still on the condition
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if(! (conditioninstanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
// This is called synchronizer, directly look
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
Copy the code
AQS#hasWaiters
public final boolean hasWaiters(ConditionObject condition) {
if(! owns(condition))throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
Copy the code
ConditionObject#hasWaiters
Loop through the Condition queue to see if any of the nodes are condition
protected final boolean hasWaiters(a) {
if(! isHeldExclusively())throw new IllegalMonitorStateException();
for(Node w = firstWaiter; w ! =null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
Copy the code
getWaitQueueLength
Check the number of Condition queues in the current Condition. The code logic here is very simple and will not be shown here. The essence is to iterate through the Condition queue.
getWaitingThreads
Check the number of threads waiting in the current Condition. The code logic here is very simple, so I won’t show it here. The essence is to iterate through the Condition queue and check the Condition.
As for the blog, I take it as my notes, and there are a lot of contents in it that reflect my thinking process. As my thinking is limited, there are inevitably some discrepancies. If there are any questions, please point them out. Discuss together. thank you