Compared to an exclusive lock and a Shared lock, the conditions in the AbstractQueuedSynchronizer queue may be concerned not many, but it is in the realization of the blocking queue plays an important role, at the same time if you want to know AQS, queue is also have to learn the condition of.
The original address: www.jianshu.com/p/3f8b08ca2…
This article will cover exclusive and shared locks in AQS. If you are familiar with them, go ahead and read on. Otherwise, I suggest you review the two articles I wrote before you start reading this article. Exclusive lock mode of AQS Shared lock mode of AQS
I. Introduction to usage scenarios
Different from the previous two articles, many people probably did not pay much attention to this content in AQS before, so in this article, we will first look at the use of conditional queue scenarios:
Private final ReentrantLock takeLock = new ReentrantLock(); Private final Condition notEmpty = takelock.newcondition (); private final Condition notEmpty = takelock.newcondition (); / / use the process of public E take () throws InterruptedException {/ / first lock takeLock. LockInterruptibly (); Try {// If the queue is empty, wait notempty.await (); // Get the element... // Wake up the waiting element thread notempty.signal () if there is anything left; } finally {takelock.unlock (); } // Wake up the thread waiting for the element after fetching it}Copy the code
The code snippet above is taken from self-linkedBlockingQueue, one of Java’s commonly used blocking queues. As you can see from the above code, conditional queues are based on locks and must be exclusive locks (the reason will be analyzed in the source code later).
Ii. Overview of the implementation process
The process of waiting for conditions:
- The exclusive lock must be successfully acquired before the condition queue can be operated, otherwise the exclusive lock will be suspended by the time it is acquired.
- After the exclusive lock is successfully acquired, if the current condition is not met, the lock is suspended from the condition queue of the current lock, and the currently acquired lock resource is released at the same time. Consider what happens if the lock resource is not released.
- If woken up, checks to see if the exclusive lock can be acquired, otherwise the suspension continues.
Wake up process after the conditions are met (for example, wake up one node, or wake up multiple nodes) :
- The first valid node in the current wait queue (invalid if canceled) is added to the synchronization queue to be awakened by the front node. If the front node is canceled at this time, the node is directly awakened to re-attempt to acquire the lock or suspend in the synchronization queue appropriately.
Note: The whole AQS is divided into two queues, a synchronous queue and a conditional queue. Only nodes in the synchronization queue can acquire the lock. Enqueuing mentioned in the previous two exclusive lock shared lock articles is a synchronous queue. The so-called wake up in conditional queues is to move a node from a conditional queue to a synchronous queue, giving the node a chance to acquire the lock.
Second, in-depth analysis of the source code
The following code is a little more complicated because it takes into account the handling of interrupts. I chose this method because I wanted to be consistent with the code snippet at the beginning of this article. The source code for the awaitUninterruptibly() method is recommended if you only want to look at the core logic.
Public final void await() throws InterruptedException {// Throws an exception if the current thread is interruptedif(Thread.interrupted()) throw new InterruptedException(); // add the current Node to the conditional queue Node Node = addConditionWaiter(); Int savedState = fullyRelease(node); int interruptMode = 0; // Suspend continuously if not in sync queuewhile(! isOnSyncQueue(node)) { LockSupport.park(this); // Interrupt handling, another way to break out of the loopif((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break; } // The node has been added to the queue or has been interrupted. The same lock fetch method is invoked for an exclusive lock, and you can see from this that conditional queues can only be used for an exclusive lockif(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT; // Delete the cancelled node from the conditional queueif(node.nextWaiter ! = null) unlinkCancelledWaiters(); // Handle interrupts according to different modesif(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
The process is more complex, step by step analysis, first look at the conditional queue code:
Unlike synchronous queues, conditional queues start and end with firstWaiter and lastWaiter. Conditional queues operate after the lock is acquired, in the critical section, so there are many places where concurrent private nodes are not a concernaddConditionWaiter() { Node t = lastWaiter; // If the last node is cancelled, the cancelled node in the queue is deletedif(t ! = null && t.waitStatus ! = node. CONDITION) {unlinkCancelledWaiters(); t = lastWaiter; } CONDITION = new Node(thread.currentThread (), node.condition);if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
returnnode; } // Delete a private void from a listunlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while(t ! = null) { Node next = t.nextWaiter;if(t.waitStatus ! = Node.CONDITION) { t.nextWaiter = null;if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
elsetrail = t; t = next; }}Copy the code
After adding the node to the conditional queue, the next thing to do is to release the lock resource:
Final int fullyRelease(Node Node) {Boolean failed =true; Try {// get the current state and release it. This means that the lock must be an exclusive lock. int savedState = getState(); // As with an exclusive lock, lock resources are releasedif (release(savedState)) {
failed = false;
return savedState;
} else{/ / if the release failure, the throw new exception IllegalMonitorStateException (); }} finally {// If the lock fails, cancel the nodeif(failed) node.waitStatus = Node.CANCELLED; }}Copy the code
At this point, the node is added to the conditional queue, the lock resource is released, and it is time to suspend (ignore the interrupt processing for now, just look at the suspend logic) :
// Continue to suspend if not in the sync queue (signal adds the node to the sync queue)while(! isOnSyncQueue(node)) { LockSupport.park(this); // Interrupt processing after analysisif((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break; } final Boolean isOnSyncQueue(Node Node) {final Boolean isOnSyncQueue(Node Node) {final Boolean isOnSyncQueue(Node Node) {final Boolean isOnSyncQueue(Node Node)if (node.waitStatus == Node.CONDITION || node.prev == null)
return false; // Quick judgment 2: The next field is used only in synchronous queues, and the nextWaiter field is used in conditional queuesif(node.next ! = null)return true; // Enter the complex judgmentreturnfindNodeFromTail(node); } // Start at the end of the synchronous queue and search for the current node. If the current node is found, it is in the following order: Private Boolean findNodeFromTail(Node Node) {Node t = tail;for (;;) {
if (t == node)
return true;
if (t == null)
return false; t = t.prev; }}Copy the code
If woken up and already moved to a synchronous queue, the same method acquireQueued() as for an exclusive lock is executed for a synchronous queue exclusive fetch. Finally, let’s take a look at the interrupt logic and the finishing code:
while(! isOnSyncQueue(node)) { LockSupport.park(this); // This can be either a normal signal operation or an interruptif((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break; } //1. If the signal is not interrupted, then 0 //2 is returned. If the Node by interrupt to join synchronous queue returns THROW_IE, by the signal to join synchronous queue returns REINTERRUPT private int checkInterruptWhileWaiting (Node Node) {returnThread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } // Change the node status and join the synchronization queue // this method returnstrueThe node is added to the synchronization queue by interruptfalseSaid the signal in synchronization queue final Boolean transferAfterCancelledWait (Node to Node) {/ / here sets the Node status is 0, if successful, to join synchronous queueif(compareAndSetWaitStatus(node, node.condition, 0)) {enq(node);return true; } // If the above setting fails, the node has been woken up by signal. Since signal will queue the node, we only need to spin to waitwhile(! isOnSyncQueue(node)) Thread.yield();return false;
}Copy the code
After we have done our post-wake interrupt judgment, look at the last piece of logic in await() :
// The first thing to do before handling the interrupt is to successfully obtain the lock resource from the synchronization queueif(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT; // Since the current node may have changed the node state due to interruption, delete the cancelled node if there is a successor node // If there is no successor node, it will be deleted when the successor node is added according to the above analysisif(node.nextWaiter ! = null) unlinkCancelledWaiters();if(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); Private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {if (interruptMode == THROW_IE)
throw new InterruptedException();
else if(interruptMode == REINTERRUPT) thread.currentThread ().interrupt(); selfInterrupt(); }Copy the code
At this point, the conditional queue await operation is all parsed. The signal() method is relatively easy to use.
// Conditional queue wake-up entry public final voidsignal() {// Throw an exception if the lock is not exclusive, again indicating that conditional queues only apply to exclusive locksif(! isHeldExclusively()) throw new IllegalMonitorStateException(); // If the conditional queue is not empty, wake up Node first = firstWaiter;if(first ! = null)doSignal(first); } // If this fails, the next node on the conditional queue will be searched until the queue is emptydoSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while(! transferForSignal(first) &&(first = firstWaiter) ! = null); Final Boolean transferForSignal(Node Node) {final Boolean transferForSignal(Node Node) {final Boolean transferForSignal(Node Node) {final Boolean transferForSignal(Node Node) {final Boolean transferForSignal(Node Node)if(! compareAndSetWaitStatus(node, Node.CONDITION, 0))return false; P = enq(Node); p = enq(Node); Int ws = p.waitStatus; int ws = p.waitStatus; // If the front node is cancelled or fails to change its state, the current node is immediately awakened. // At this point, the current node is already in the synchronization queueif(ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}Copy the code
Third, summary
Conditional queues are probably the least of the concerns compared to exclusive and shared locks, but since they are a key component of blocking queue implementation, it is important to understand how they work. In fact, I think there are two key points. The first is that conditional queues are built on a specific lock, and the second is that conditional queues and synchronous queues are two different queues. The former depends on conditional awakening and the latter depends on lock release awakening.
At this point, all three lock patterns in the Java synchronizer AQS are analyzed. Although have tried to think, as far as possible to write clearly, but in view of the level is limited, if there is a mistake in the place, welcome the majority of readers correct. Tomorrow is the National Day holiday, I also plan to go abroad to play, relax. I wish friends a happy National Day in advance.