opening
AQS is the cornerstone of JUC package. The author will explain the realization of AQS exclusive lock through ReentrantLock in JUC package, and explain the realization of AQS shared lock through Semaphore.
This paper strives to use straightforward structure diagram and detailed description, so that we can spend the least time to understand the process of AQS in detail.
I. AQS waiting queue
All threads that do not acquire locks will enter the waiting queue of AQS, which is actually a two-way linked list, as shown in the figure below:
The head node is a node when the queue is initialized and represents only the position, not the actual waiting thread. The node following the head node is the thread that failed to acquire the lock and entered the wait queue. AQS AQS AQS AQS AQS AQS AQS AQS
Static final class Node {/** SHARED mode */ static final Node SHARED = new Node(); /** EXCLUSIVE mode */ static final Node EXCLUSIVE = null; /** Node status, indicating that the node has been CANCELLED */ static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; /**waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; If the current node is not blocking, the state of the current node is set to -3 */ static final int PROPAGATE = -3; // Wait status volatile intwaitStatus; // Volatile Node prev; // Volatile Node next; // Wait for Thread volatile Thread Thread; // Node nextWaiter; }Copy the code
Let’s look at two other core attributes of AQS
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// Synchronization status value (number of locks)
private volatile int state;
/ / ps: AbstractOwnableSynchronizer inherited from the parent class attribute, to display convenient take come over here
// Record the thread that acquired the lock
private transient Thread exclusiveOwnerThread
// The above Node Node
static final class Node {
/ /...}}Copy the code
Next, let’s take a look at how a thread joins the wait queue and how the state value of each node in the queue changes through the lock and unlock process of ReentrantLock.
Reentrantlock. lock()
Let’s take a quick look at the core structure of ReentrantLock
public class ReentrantLock implements Lock, Java. IO. Serializable {/ / inherited from AQS abstract static class Sync extends AbstractQueuedSynchronizer {/ /... } static final class NonfairSync extends Sync { // ..... } static final class FairSync extends Sync {//..... Fair lock}}Copy the code
As you can see,ReentrantLock is largely inherited from AQS by inner classes and implements both unfair and fair locking.Let’s take a look at the process of locking (this is an example of unfair locking, the difference between fair locking will be discussed separately below) :
Next, let’s look at the implementation details of the source code in the order of the flow chart (take unfair lock as an example)
final void lock(a) {
// Try to get the lock quickly
if (compareAndSetState(0.1))
setExclusiveOwnerThread(Thread.currentThread()); // Record the thread that acquired the lock after success
else
acquire(1); // Go through the normal process of obtaining the lock
}Copy the code
Acquire() is the template method of AQS, where tryAcquire() is implemented by subclasses themselves, while addWaiter() and acquireQueued() are both fixed.
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
protected final boolean tryAcquire(int acquires) {
// Implementation of an unfair lock
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); / / state values
if (c == 0) { / / lock idle
if (compareAndSetState(0, acquires)) { // Try CAS to quickly preempt the status value
setExclusiveOwnerThread(current); // Record that the current thread acquired the lock
return true; }}// The current thread is reentrant (the same thread needs to execute locking methods repeatedly, such as recursive calls)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // The status value is increased
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // Set the status value
return true;
}
return false;
}
Copy the code
If step 1 fails to acquire the lock, you need to join the wait queue.
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // Create a new node (current thread)
// First we try to use CAS to quickly add the current new node to the tail node
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // If CAS fails, spin +CAS is added to the tail node
return node;
}Copy the code
Look at the spin join queue operation
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if(t == null) {// If the tail node is null, the head and tail nodes must be initialized firstif(compareAndSetHead(new Node())) // Create a new Node(placeholder) //head and tail point to the first position node}else {
node.prev = t;
if(compareAndSetTail(t, node)) {// Add the current thread node to the end of the queue by CAS.returnt; }}}}Copy the code
Finally, the acquireQueued() method needs to be called to do the final operation
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // Whether to cancel the node try {Boolean interrupted =false; // Whether to interrupt the threadfor(;;) { final Node p = node.predecessor(); // Get the precursor node of the current thread nodeif(p == head && tryAcquire(arG)) {// If the first node is head and the current node thread has successfully acquired the locksetHead(node); // Set the current node to head (the current node becomes the position indicator). //helpGC, here remove the strong reference of the original head node to facilitate GC recovery of resources failed =false;
returninterrupted; } // During the spin process, you need to determine whether the current thread needs to block (normally loop up to 3 times, not indefinitely). Of course, the precursor node is always cancelled.)if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if(failed) cancelAcquire(node); // Cancel the node, see the following}}Copy the code
How does AQS determine if the current thread needs to block
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // The value of the wait state of the precursor node of the current node
if (ws == Node.SIGNAL)
// If ws==-1, the current node waits for the precursor node to wake up and can safely block itself
return true;
if (ws > 0) {
//ws>0, then the precursor has been cancelled, so find the node from the precursor whose waitStatus<=0
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; // Set it to the precursor node of the current node
} else {
/* At this point, the ws of the precursor node has only 0 or -3 (PROPAGATE), which means that the current node needs * to wait for the signal to wake up, but instead of being suspended immediately, the current thread is suspended */ if the lock is not acquired next time
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}Copy the code
If blocking is required, the following method is executed
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this); // call LockSupport to block the thread (after UNSAFE, no further)
return Thread.interrupted(); // Get whether the current thread needs to interrupt, and clear the interrupt flag
}Copy the code
Now what does canceling the node do
Private void cancelAcquire(Node Node) {// The Node is null and does nothingif (node == null)
return; node.thread = null; Pred = node.prev; pred = node.prev;while(pred.waitStatus > 0) node.prev = pred = pred.prev; // Node predNext = pred.next; CANCELLED node.waitStatus = node.cancelled; // If the current node is the tail node, set the precursor node to the tail node first and then remove the current nodeif (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else{// If the successor node needs a signal to wake up, then chain the successor node to the previous node; Otherwise, wake up the successor node int ws directly;if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! = null) { Node next = node.next;if(next ! = null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); }else{// Wake up the next blocking node of the current node (the process of unlocking the lock is described in more details); } node.next = node; //help GC
}
}Copy the code
At this point, the locking process is over. Let’s first summarize the numerical changes of ws of nodes in the waiting queue by stages:
- Create a node ws==0
- Cancel node, ws==1 (CANCELLED)
- After entering the wait queue, there is a chance to set WS to -1 (SIGNAL).
There is also a state, -3 (PROPAGATE), which we’ll talk about in the next section when we talk about shared locks.
We used unfair locks as an example, but there is a step difference between ReentrantLock’s fair and unfair locks
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0) {// The difference is the HasqueuedToraise () method, whereby, if someone is waiting before him, he stops cutting and enters the waiting queueif(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; }}Copy the code
Let’s look at the process of releasing locks.
Unlock (reentrantLock. unlock())
The process of releasing a lock is the same whether it is a fair lock or an unfair lock. Let’s start with a flow chart
Release lock process is relatively simple, we quickly look at the source code
//ReentrantLock.unlock()
public void unlock() { sync.release(1); } public final Boolean release(int arg) {// If the lock is unlocked, wake up the next node of the head nodeif (tryRelease(arg)) { Node h = head;
if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;
}
return false;
}Copy the code
Look at the process of releasing the lock
protected final boolean tryRelease(int releases) { int c = getState() - releases; // Subtract the status valueif(Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free =false;
if(c == 0) {// If the status value is 0, reset the thread property that holds the lock (c may still be greater than 0 after the reentrant lock is released) free =true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}Copy the code
We continue with the operations mentioned above to wake up the subsequent node threads
Private void unparksucceeded (Node Node) {// Remind that this method is to wake up the Node's successor int ws = node.waitStatus;if(ws < 0) // If the current node is less than 0, set it to 0 compareAndSetWaitStatus(node, ws, 0); /* * attempts to wake up the blocking thread on the next Node, usually the next Node */ Node s = node.next; // Get the successor nodeif(s = = null | | s. aitStatus > 0) {/ / if the next node is null or has been canceled s = null; // Then we need to start from the tail node, find the node closest to the current node and ws<=0, and wake up the node threadfor(Node t = tail; t ! = null && t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! = null) LockSupport.unpark(s.thread); // Call LockSupport to wake up the thread. Underlying is the UNSAFE class.Copy the code
Four, summary
AQS exclusive lock process is over here. The wait queue structure of AQS (with two other core attributes: State and the lock holding thread variable exclusiveOwnerThread) is straightforward, as long as you understand the wait queue structure of AQS (with two other core attributes: State and the lock holding thread variable exclusiveOwnerThread), and then understand the change scenario of the node wait state value (WS).
There is a template pattern mentioned in this article. If you are not familiar with design patterns, you can take a look at design patterns. The AQS series (below) will be illustrated to explain the AQS shared lock implementation.