An overview of the
Although Condition is not used in my work, I have not paid much attention to it. Recently, WHEN I started to read JDK source code, I found a large number of Condition is used in it, so I came to study the internal implementation principle.
Condition, an inner class of AQS, is aA single necklace watch with hands at the head and tail
By maintaining aWaiting queueAnd AQSSynchronous queueThe interaction between the two reaches a wake-up mechanism. The whole picture is that simple
public static void main(String[] args) throws Exception{# create a synchronization queue ReentrantLock lock =newReentrantLock(); Condition Condition = lock.newCondition();new Thread(()->{
try {
lock.lock();
System.out.println("Start blocking"); # await(' await '); # await(' await '); # await(' await '); Println () {system.out.println ();}"End of block");
}catch (Exception e){
}
finally {
lock.unlock();
}
}).start();
Thread.sleep(1000);
new Thread(()->{
try {
lock.lock();
System.out.println("Get locks and release resources"); Condition. Signal (); condition. Signal () Thread.sleep(1000);
System.out.println("Release over.");
}catch (Exception e){
} finally {
lock.unlock();
}
}).start();
System.in.read();
}
Copy the code
Waiting queue
ConditionObject, the internal class of AQS, is a wait queue, and a ReentrantLock can create multiple wait queues. ConditionObject (ConditionObject) ConditionObject (ConditionObject) ConditionObject (ConditionObject
Public class conditional conditional, java.io.Serializable {private transient Node firstWaiter; Private TRANSIENT Node lastWaiter; public ConditionObject() { } }Copy the code
await
The await method is to wrap the current thread into a Node and put it on await queue. Let’s see the implementation logic
public final void await(a) throws InterruptedException {
if (Thread.interrupted())
throw newInterruptedException(); Node = addConditionWaiter(); # Release AQS lockint savedState = fullyRelease(node);
int interruptMode = 0; Check if the AQS is in the sync queue, if so, exit the loopwhile(! IsOnSyncQueue (node)) {# The current thread is asleep, waiting to wake up locksupport. park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break; } # after waking up, I am already in the synchronization queueif(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode ! =0) reportInterruptAfterWait(interruptMode); } # create a queue node and insert it at the end of the queueprivate Node addConditionWaiter(a) {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node =new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
Copy the code
signal
The wake up logic is simply to throw a waiting Node into a synchronous queue, focusing on the transferForSignal method
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
public final void signal(a) {
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
final boolean transferForSignal(Node node) {
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false; P = enq(Node);intws = p.waitStatus; # The following is the optimization speed logic, do not worry about itif (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code
conclusion
The Condition calls await to release the lock and joins the wait queue. The signal method tells Node to queue synchronously to grab the lock resource.