What is AQS?

What is AQS, I believe we are not unfamiliar with this topic, so what is AQS in the end? Abstract Queued Synchronizer (AQS) is an Abstract class for Queued Synchronizer. You need to customize the logic in the acquisition and release of executables in synchronous queues (redefine the acquisition and release semantics) by rewriting methods like tryAcquire tryRelease tryAcquireShared tryReleaseShared, Of course, you can also customize methods to make logical judgments by calling the judgment methods provided by AQS. Prior to JDK9, AQS relied on CAS, with underlying changes being synchronized through Unsafe’s compareAndSwap* method, and later using VarHandle, which also replaced Unsafe. To put it bluntly, AQS uses VarHandle to ensure atomicity of operations.

In plain English, it can be understood as: it means that only one person can operate a certain thing at the same time. If there are many people, they need to queue up and wait until the current operator finishes and informs the next one.

AQS source code analysis

preface

AbstractQueuedSynchronizer inherited AbstractOwnableSynchronizer in the source code, at the same time also inherited exclusiveOwnerThread attributes, This is the owner of the exclusive mode synchronizer, which means that the thread is currently executing.

There are several key methods in AQS, which are: acquire acquireInterruptibly tryAcquireNanos release acquireShared acquireSharedInterruptibly tryAcquireSharedNanos ReleaseShared Is analyzed one by one.

Before analyzing the source code, take a look at a diagram to understand the AQS queued synchronization queue and the waitStatus status in the Node Node.

What are the waitStatus states

  • CANCELLED, the value is 1 and represents the waiting thread in the synchronization queueWait for a timeoutorBe interrupted, needs to be removed from the synchronization queue, the node will not change after entering this state.
  • SIGNAL, with a value of -1, indicates that the thread of the successor node is in a waiting state, and if the thread of the current node releases its synchronization state or is canceled, it notifies the successor node so that the thread of the successor node can run.
  • CONDITION, with a value of -2. The node is on a waiting queue. The node thread is waiting on CONDITION.
  • PROPAGATE, the value is -3, which means that the shared synchronization state will be transmitted unconditionally.
  • INITAL, value 0, initial state.

Acquire (get)

Acquires in exclusive mode, ignoring interrupts Acquires in exclusive mode, ignoring interrupt

Public final void acquire(int arg) {public final void acquire(int arg) {if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

The tryAcquire acquireQueued and addWaiter methods are called respectively in the Acquire method, where the tryAcquire method is the logic that needs to be customized (overwritten) to acquire and execute permissions. Here we are in the realization of the AbstractQueuedSynchronizer already, for example, a simple analysis, see first tryAcquire method

Protected final Boolean tryAcquire(int acquires) {final Thread current = thread.currentThread (); Int c = getState(); If (c == 0) {// If (c == 0) { hasQueuedPredecessors() && compareAndSetState(0, Acquires)) {// Set the owner of the setExclusiveOwnerThread(current); return true; Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }Copy the code

The core code of the tryAcquire method is to determine the execution permission. I won’t go into details here, but I will analyze the source code of ReentrantLock in the next article, focusing on the acquireQueued and addWaiter methods.

Private Node addWaiter(Node mode) {// Use the constructor to create a new Node with the mode parameter specified. for (;;) { Node oldTail = tail; // If tail is not null, set the prev of the new Node to point to the tail Node. If (oldTail! = null) {// Set the prev of the new Node to point to the tail Node node.setPrevrelaxed (oldTail); == tail Node if (compareAndSetTail(oldTail, Oldtail.next = node; oldtail.next = node; oldtail.next = node; return node; }} else {// Initialize the synchronization queue initializeSyncQueue(); }}} private final void initializeSyncQueue() {Node h; If (HEAD.compareAndSet(this, null, (h = new Node())))) // assign operation tail = h; }Copy the code

In the addWaiter and initializeSyncQueue methods, the core is to create a Node and add it to the AQS by acquireQueued. What does addWaiter do

  1. The constructor creates a new Node, and the mode parameter specifies the mode of the Node, shared or exclusive. Of course, this is set to exclusive mode.

  2. Loop to create a Node and establish the relationship between the new Node and the tail Node. Check whether tail is null. If tail is null, go to Step 3. Otherwise, go to Step 4

  3. If the tail Node is not null, first set the prev point in the new Node to the current tail Node, and then change the tail Node of the AQS to the new Node through VarHandle. If the modification is successful, next in the tail Node (oldTail in the code) that was unchanged in the previous step points to the new Node. Otherwise, the tial Node may have been changed by other threads due to concurrent operation, and the operation needs to be repeated until the modification succeeds.

  4. If the tail node is null, then you need to instantiate the synchronized column, AQS, by calling initializeSyncQueue, Set the HEAD of the AQS to a new Node via VarHandle, and then assign a reference to the head Node to the tail Node. And notice here, The prev node of the tail node and the head node refer to the same memory address. The prev node of the tail node and the head node refer to the same memory address. Because in the subsequent judgment, the condition for obtaining enforceability is whether the HEAD node of AQS is equal to the PREv node of the current node.

    Because the addWaiter method is a loop and needs to associate the new Node after the queue is created, you need to perform Step 3 again

After analyzing the addWaiter method, take a look at the acquireQueued method

Final Boolean acquireQueued(final Node Node, int arg) {// Interrupted status Boolean interrupted = false; try { for (;;) {// Obtain the Node method created by addWaiter final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {// Set head to the Node created by the current thread. SetHead (Node); setHead(Node); setHead(Node); // set next p.ext = null; // help GC // return return interrupted; } // Check whether Node threads are waiting. Here is park if (shouldParkAfterFailedAcquire (p, Node)) / / thread will wait and after the thread to be awakened To determine whether a thread is interrupted / / | = operation is interrupted interrupted | parkAndCheckInterrupt () / / | will (bitwise or) | On both sides of the value calculation of binary algorithm a is true is true, for example 1 1 = 1 or 1 | | 0 = 1, interrupted | = parkAndCheckInterrupt (); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; }}Copy the code

AcquireQueued is also the core method in which threads are controlled by locksupport. park, which is implemented in a loop, as discussed below

  1. It first gets the front Node (preV) in the Node created by the current thread.

  2. Determine whether prev is equal to the head node of AQS && can obtain the right to execute. If these two conditions are true, see Step 3; otherwise, see Step 4. If these two conditions are met, This means that the thread corresponding to the head node has finished and done the release operation.

  3. If the condition of Step 2 is true, that is, the thread has been awakened and has been given executable rights, then set the head Node to the Node created by the current thread.

  4. If the condition in Step 2 is not true, check whether the thread corresponding to the Node Node is in wait state. This logic in shouldParkAfterFailedAcquire approach, look at the next.

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; If (ws == node. Signal) /* * This Node has already set waitStatus == Signal == -1 status asking a release * to signal it, so it can safely park. / return true; // If the status is greater than 0, that is, the thread has been interrupted. If (ws > 0) {/ * The Predecessor was Predecessor. If (ws > 0) cancelled. Skip over predecessors and * indicate retry. / do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { / * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, But don’t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ / Otherwise set the state to -1 Will be wait again in later pred.com pareAndSetWaitStatus (ws, Node. SIGNAL); } return false; }

In shouldParkAfterFailedAcquire approach is mainly to determine whether a node’s thread identifier rules, is changed to a wait state

  1. judgeCurrent thread nodetheFront nodesthewaitStatusSIGNAL or not. If yes, returntrue, the thread willwait
  2. judgeCurrent thread nodetheFront nodesthewaitStatusIs it greater than 0, which is 1, and if this is true, it meansCurrent thread nodeFront nodesThe thread has been interrupted, so you need to re-specify the prev of the current thread Node, find the Node of the preV through a loop. If the thread is still interrupted, continue the loop until the Node corresponding to the uninterrupted thread is found. If the condition is not true thenwaitStatusStatus is changed toSIGNALReturn false and pass againacquireQueuedThe loop in the method executes once.
  3. Why change the current nodeprevNodes in thewaitStatusState, because onlyFront node (PREV)waitStatusIs equal to theSIGNAL is -1When,The thread of Node created by the current thread is in the wait state.Before the current node (preV)The current node is notified that the thread on the current node has released its synchronization state or has been cancelled

By now, we have finished parsing the whole acquire method, and then we will share release. After release, we will share the whole process of acquire to Release.

Release (release)

Release is a release. Release the exclusive mode acquired through acquire, so that the thread corresponding to the subsequent AQS node can get the execution right

Public final Boolean release(int arg) {// Step 1 2 3 if (tryRelease(arg)) {Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code

The tryRelease method will be called first, and there will be subclasses of the tryRelease method, RenntrantLook for example, so I won’t show you the code here, but just describe the logic

  1. I’ll use it firststateMinus thearg.stateRepresents the number of reentries.
  2. ifStep 1If the result is 0, it willOwner of the exclusive mode synchronizerChange to null and return true.
  3. ifStep 1If the result is not 0, reset state and return false, indicating that it cannot be released yet.

Next, it is determined that the HEAD of AQS is not null and the waitStatus is not equal to 0, which indicates that the precursor is successfully released. Then it goes to the unparksucceeded method to wake up the threads corresponding to the next Node.

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed Int ws = node.waitStatus; // if head waitStatus<0, head waitStatus = 0 int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse traverse backwards from tail to find the actual * non-cancelled. */ / if the next node of the head node == null or the state of the node Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; For (Node p = tail; waitStatus <= 0; p ! = node && p ! = null; p = p.prev) if (p.waitStatus <= 0) s = p; } // Wake up the thread of the node if (s! = null) LockSupport.unpark(s.thread); }Copy the code

Next what has been done in the unparksucceeded

  1. First of all if headwaitStatusThe < 0, will beheadwaitStatusInstead of 0.
  2. ifheadThe node’snextThe node is null orwaitStatusThe state is greater than 0, which is 1headCorresponding to the nodenextThe thread corresponding to the node has been interrupted and will loop the entire synchronization queue fromtailThe node starts to loop forward until it finds itThe first Node with waitStatus <= 0.
  3. ifStep 2If the condition is not met, the next node of head is not null or waitStatus is not equal to 1unparkMethod to wake up the thread.

Now that the release method is parsed, it’s very simple. The core function is simply to call the unpark method to wake up the thread corresponding to the next node in the AQS queue if the rule is met. Let’s analyze the whole acquire and Releae process.

Summarize Acquire and release

Analyze the whole acquire and release process

The acquireInterruptibly method is the same as acquire except that it determines whether it is interrupted.

acquireInterruptibly

The acquireInterruptibly method is an interruptible acquirer of executable rights, similar to acquire.

public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (! tryAcquire(arg)) doAcquireInterruptibly(arg); }Copy the code

In the acquireInterruptibly method, thread.interrupted () is used to determine whether the Thread has been interrupted. If it has been interrupted, throw InterruptedException or call tryAcquire. If not, call the doAcquireInterruptibly method to create AQS and a new Node, and associate the new Node with the HEAD Node of the AQS. The acquire method is the same as the acquire method. Take a look at the source code

Private void doAcquireInterruptibly(int ARg) throws InterruptedException {// Create a Node and associate the Node with the AQS queue resume final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; }}Copy the code

The only difference with acquire method is that if the thread of the head node calls release after being awakened, the thread of the head node releases the executability. The lockSupport. park method wakes up the thread of the head’s next node if the thread of the head’s next node has been interrupted. InterruptedException is thrown.

The addWaiter method and parkAndCheckInterrupt method will not be used here. If you do not understand the above acquire method source code analysis.

tryAcquireNanos

TryAcquireNanos (tryAcquireNanos) ¶ tryAcquireNanos (tryAcquireNanos) ¶ tryAcquireNanos (tryAcquireNanos) ¶ The unit of the timeout is nanosecond ns, 1 second (s)=1000000000 nanoseconds (ns)

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
Copy the code

What comes to mind when you see the tryAcquireNanos method? Throws InterruptedException on the method. AcquireInterruptibly (InterruptedException) ¶ First, the tryAcquire method will be called to obtain the execution right. If the execution right can be obtained, it will return directly. Otherwise, the doAcquireNanos(ARG, nanosTimeout) method is called to create a new Node and associate it with the HEAD Node of AQS, add the Node to the queue of AQS, and then put the thread of the Node into the wait queue.

private boolean doAcquireNanos(int arg, Long nanosTimeout) throws InterruptedException {// Determine whether the timeout is less than or equal to 0 if this returns false if (nanosTimeout <= 0L) return false; // Use the current time nanosecond + timeout time nanosecond = future timeout time, Wait (long timeoutMillis) final Long deadline = System.nanotime () + nanosTimeout; // Use the constructor to create a new Node. The mode parameter specifies the mode of the Node. try { for (;;) Final Node p = node.predecessor(); {// Determine whether the new Node is equal to the head Node && can obtain 'executable rights'. If (p == head && tryAcquire(arg)) {if (p == head && tryAcquire(arg)) {if (p == head && tryAcquire(arg)) { SetHead (node) ensures that the head node can always be associated with subsequent nodes. p.next = null; // help GC return true; } // Determine the calculated deadline time - current time is less than 0 if the timeout has expired, return false nanosTimeout = deadline-system.nanotime (); if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // Check whether Node threads are waiting. Here is park and nanosecond must be greater than 1000 if (shouldParkAfterFailedAcquire (p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); If (thread.interrupted ()) throw new InterruptedException(); if (thread.interrupted ()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; }}Copy the code

If you understand the principles in acquire and release analysis, it is not easy to see here. Instead of enumerating the methods that have been analyzed, you can directly state the differences

  1. Increase the timeout, which is used hereLockSupport.parkNanos(this, nanosTimeout)The method is the same thing asObject.wait(long timeoutMillis)Method, the state of the waiting thread becomes invalid during the timeout periodwaitintorun
  2. After the thread is woken up, that is, after the timeout period has elapsed, it is judged to have calculatedThe deadline timeThe current timeIf yes, the timeout period has expired. If yes, return false. If no, continue.
  3. Interrupts are supported and thrown if brokenInterruptedExceptionThe exception.

Isn’t it easy? The reader needs to focus on acquire and release, and the rest is easy. The above content is the exclusive mode of acquisition, next to explain the shared mode.

acquireShared

Public final void acquireShared(int arg) {public final void acquireShared(int arg) { If (tryAcquireShared(arg) < 0) doAcquireShared(arg); if (tryAcquireShared(arg) < 0) doAcquireShared }Copy the code

Can see through acquireShared method and acquire and no difference, need custom synchronizer to enforcement of the code for implementation, in Shared mode analysis is wrong ReentrantReadWriteLock source code analysis, DoAcquireShared is the same logic as the acquireQueued method.

Private void doAcquireShared(int arg) {final Node = addWaiter(node.shared); boolean interrupted = false; try { for (;;) Final Node p = node.predecessor(); Int r = tryAcquireShared(arg); // If (p ==head) {int r = tryAcquireShared(arg); Propagate(node, r); propagate (node, r); p.next = null; // help GC return; }} // Check whether Node threads are waiting. Here is park if (shouldParkAfterFailedAcquire (p, Node)) / / thread will wait and after the thread to be awakened To determine whether a thread is interrupted interrupted | = parkAndCheckInterrupt (); } } catch (Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) selfInterrupt(); }}Copy the code

In the doAcquireShared method, we will first call the addWaiter method to create a new Node. In which, the thread will be locksupport. park control, its implementation is a loop, the following specific analysis

  1. It first getsThe current threadCreated by theNodeNodes in theFront node (PREV)
  2. judgeFront node (PREV)Whether or notIs equal to the AQSheadNode, if the condition is trueStep 3, whereas theStep 4If the condition is met, it means thatheadnodeThe corresponding threadThe execution is complete and the release operation is done.
  3. ifStep 2If the condition is true, judge againWhether the current thread can obtain executable rightsIf yes, set this parameterAQSheadThe node is of the current threadNewly created Node Node, and vice versaStep 3.
  4. ifStep 2Step 3If the condition does not hold, judgeNodeWhether the state of the thread corresponding to the node conforms to changewaitStatus, that is, whether it can be added to the wait queue. The logic is thatshouldParkAfterFailedAcquireIn the method, you can take a look at the above method.

acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } copying code private void doAcquireSharedInterruptibly (int arg) throws InterruptedException {/ / access through addWaiter create Node method final Node node = addWaiter(Node.SHARED); try { for (;;) Final Node p = node.predecessor(); If (p == head) {// If (p == head) {int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; }} / / judge whether the thread of the Node Node meet be wait && thread will wait and to determine whether a thread is interrupted after the thread to be awakened the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; }}Copy the code

Can see acquireSharedInterruptibly and acquireShared method and there is no much difference, the only difference is in the call parkAndCheckInterrupt thread state is wait, If the thread that owns the current node prev wakes up after the release method is called, InterruptedException is thrown if the thread is interrupted.

tryAcquireSharedNanos

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private Boolean doAcquireSharedNanos(int arg, Long nanosTimeout) throws InterruptedException {// If (nanosTimeout <= 0L) return false; // Use the current time nanosecond + timeout time nanosecond = future timeout time, Wait (long timeoutMillis) final Long deadline = System.nanotime () + nanosTimeout; // Use the constructor to create a Node. The mode of the Node is specified according to the input parameter mode. Node = addWaiter(node.shared); try { for (;;) {// Determine whether the new Node is equal to the head Node final Node p = node.predecessor(); If (p == head) {int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return true; }} // Check whether the deadline is less than or equal to 0 if the timeout has expired, false nanosTimeout = deadline-system.nanotime (); if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // Check whether Node threads are waiting. Here is park and nanosecond must be greater than 1000 if (shouldParkAfterFailedAcquire (p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); // // Check whether the Thread is interrupted and throw InterruptedException if (thread.interrupted ()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; }}Copy the code

The doAcquireSharedNanos method is similar to the doAcquireNanos method. The doAcquireSharedNanos method is similar to the doAcquireNanos method

  1. Increase the timeout, which is used hereLockSupport.parkNanos(this, nanosTimeout)The method is the same thing asObject.wait(long timeoutMillis)Method, the state of the waiting thread becomes invalid during the timeout periodwaitintorun
  2. After the thread is woken up, that is, after the timeout period has elapsed, it is judged to have calculatedThe deadline timeThe current timeIf yes, the timeout period has expired. If yes, return false. If no, continue.
  3. Interrupts are supported and thrown if brokenInterruptedExceptionThe exception.

Just focus on acquire and acquireShared, and the rest is easy.

releaseShared

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h ! = null && h ! = tail) { int ws = h.waitStatus; If (ws == node.signal) {// Loop if (! h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // Wake up the next thread of the head node unparksucceeded (h); } else if (ws == 0 &&! h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; Loop on failed CAS} if (h == head) // loop if head changed; }}Copy the code

The tryReleaseShared method is slightly different from the Release method, so let’s look at it in detail

  1. First try to free resources throughtryReleaseSharedMethod, if the release is successful, it means that there is a resource free, then look atStep 2
  2. calldoReleaseSharedTo wake up subsequent nodes indoReleaseSharedIn the methodloopEach cycle is acquired firstheadNode, ifheadThe node is not empty and is not equal totailIf the node is in the SIGNAL state, then it needs a successor node to wake up, change its state to 0(the initial state), and passunparkSuccessorMethod to wake up the thread that the subsequent node belongs to, and if the node state was 0 to begin with, convert it toPROPAGATEState, which ensures that resources can be propagated later when they are acquired.

Author: Seven mile Trip link: juejin.cn/post/684490… The copyright belongs to the author. Commercial reprint please contact the author for authorization, non-commercial reprint please indicate the source.