AQS, full name: AbstractQueuedSynchronizer, is to provide a synchronous frame JDK, internal maintains a bi-directional FIFO queue, the queue CLH synchronization.
AQS relies on it for synchronization state management (voliate modified state, used to indicate whether a lock is held). If the failure to obtain the synchronization state, the current thread and waiting information will be constructed into a Node, the Node into the FIFO queue, while blocking the current thread, when the thread will release the synchronization state, the FIRST section of the FIFO queue will wake up, so that it can obtain the synchronization state.
Many locks under JUC packages are implemented based on AQS, as shown in the following brain diagram:
Node
static final class Node {
/** Shared node */
static final Node SHARED = new Node();
/** exclusive node */
static final Node EXCLUSIVE = null;
/** The node will be set to the cancelled state because of timeout or interruption. The cancelled node will not participate in the competition and will remain cancelled
static final int CANCELLED = 1;
/** The successor node is in the wait state. If the current node releases synchronization or is cancelled, the successor node is notified so that it can run */
static final int SIGNAL = -1;
/** The thread of the node waits on condition. When another thread calls signal on condition, the node will enter the synchronization queue from the waiting queue to obtain synchronization status */
static final int CONDITION = -2;
/** * The next shared synchronization state acquisition is propagated unconditionally */
static final int PROPAGATE = -3;
/** Wait status */
volatile int waitStatus;
/** The precursor node */
volatile Node prev;
/** The successor node */
volatile Node next;
/** The thread that gets the synchronization status */
volatile Thread thread;
/** * The next conditional queue waits for the node */
Node nextWaiter;
final boolean isShared(a) {
return nextWaiter == SHARED;
}
final Node predecessor(a) throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread; }}Copy the code
FIFO structure
Exclusive synchronization state process
/** * get synchronization status exclusively */
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
Copy the code
tryAcquire
Attempt to acquire the lock, return true on success, false otherwise. This method is implemented by subclasses that inherit from AQS themselves. The design pattern of template method is adopted.
For example, ReentrantLock’s Sync inner class, Sync subclasses: NonfairSync and FairSync
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
Copy the code
addWaiter
private Node addWaiter(Node mode) {
// Create a Node
Node node = new Node(Thread.currentThread(), mode);
// Try adding a tail quickly
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
// CAS sets the tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// If the above fails to be added, the loop attempts to add until it succeeds
enq(node);
return node;
}
Copy the code
enq
private Node enq(final Node node) {
// Continue the for loop until Node is inserted successfully
for (;;) {
Node t = tail;
if (t == null) {
// CAS sets the first node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS sets the tail
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
// Indicates whether the operation succeeded
boolean failed = true;
try {
// Thread interrupt flag
boolean interrupted = false;
// Continuous spin loop
for (;;) {
// Prev of the current node
final Node p = node.predecessor();
// Check whether prev is a header && is in sync state
if (p == head && tryAcquire(arg)) {
// Set the current node as a header
setHead(node);
// Remove the prev node from the queue
p.next = null; // help GC
failed = false;
return interrupted;
}
// During the spin process, determine whether the current thread needs to block && block the current thread and check the thread interrupt status
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if (failed)
// Cancel obtaining the synchronization statuscancelAcquire(node); }}Copy the code
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// Get the status of the prev node of the current node
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/* * If the prev's status is signal, the current node is notified when the prev releases synchronization or cancels, so the current node can safely block (rather than sleep) */
return true;
if (ws > 0) {
/* * status > 0, which indicates that the cancelled node is removed from the queue * until a non-cancelled node is found */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* * Set prev status to signal */ via CAS
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt(a) {
// Block the current thread
LockSupport.park(this);
// Returns the interrupted status of the current thread
return Thread.interrupted();
}
Copy the code
selfInterrupt
static void selfInterrupt(a) {
// Failed to get synchronization state && Thread interrupt state is true, interrupt current thread
Thread.currentThread().interrupt();
}
Copy the code
Release the exclusive synchronization state
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
// If the header is not null && and the header state is not 0
// Wake up the successor node of the header
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
tryRelease
Attempts to release synchronization, true on success, false otherwise. This method is implemented by subclasses that inherit from AQS themselves. The design pattern of template method is adopted.
For example, the tryRelease method of ReentrantLock’s Sync inner class.
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
Copy the code
unparkSuccessor
private void unparkSuccessor(Node node) {
/* * Get the current node status */
int ws = node.waitStatus;
// If the state of the current node is less than 0, use CAS to set it to 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * Gets the successor of the current node */
Node s = node.next;
/ / if the subsequent node is empty | | or subsequent node status > 0 (for cancellation status)
if (s == null || s.waitStatus > 0) {
s = null;
// Find available nodes whose state is not canceled from the tail node
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
// Wake up the successor node
LockSupport.unpark(s.thread);
}
Copy the code
conclusion
A FIFO synchronization queue is maintained in AQS. When the thread fails to obtain the synchronization state, it will join the opposite end of the CLH synchronization queue and keep the spin all the time. The thread in the CLH synchronization queue will determine whether its precursor node is the leading node when spinning. If the leading node tries to obtain the synchronization status continuously, it will exit the CLH synchronization queue if it succeeds in obtaining the synchronization status. When the thread finishes executing the logic, it releases the synchronization state and wakes up subsequent nodes.
Shared state synchronization process
The main difference between the shared mode and the exclusive mode is that only one thread can acquire the synchronization state at the same time, while the shared mode can have multiple threads acquire the synchronization state at the same time. For example, read operations can be performed by multiple threads at the same time, while write operations can be performed by only one thread at a time. All other operations will be blocked.
AcquireShared Gets the synchronization state
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// Failed to obtain synchronization status
doAcquireShared(arg);
}
Copy the code
tryAcquireShared
Attempt to acquire the shared lock, return true on success, false otherwise. This method is implemented by subclasses that inherit from AQS themselves. The design pattern of template method is adopted.
For example, ReentrantReadWriteLock’s Sync internal class
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
Copy the code
doAcquireShared
private void doAcquireShared(int arg) {
// Add shared mode nodes to the queue
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// Spin to get synchronization status
for (;;) {
// The precursor of the current node
final Node p = node.predecessor();
// If the precursor node is the head node
if (p == head) {
// Try to get the shared synchronization status
int r = tryAcquireShared(arg);
if (r >= 0) {
// Set the current node as the head node and release the successor node that is also in shared mode
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// Actually release the shared synchronization state and wake up the next nodedoReleaseShared(); }}Copy the code
doReleaseShared
private void doReleaseShared(a) {
// Spin releases shared synchronization state
for (;;) {
Node h = head;
// If the header is not null && the header is not equal to the tail, there is a valid node
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
// If the status of the header is signal, there is a successor node that needs to be woken up
if (ws == Node.SIGNAL) {
// Update the state of the header to 0 (the initial state), because the header is now useless
// continue To ensure that the replacement succeeds
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
// Wake up the successor node
unparkSuccessor(h);
}
// If the state is the initial state 0, then set to PROPAGATE state
// Ensure that subsequent nodes are notified when synchronization state is released
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code
unparkSuccessor
/** * wakes up the successor node */
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// If the current node state < 0, set the state to 0, the current node is useless
compareAndSetWaitStatus(node, ws, 0);
// Get the successor node of the current node
Node s = node.next;
/ / if there is no subsequent nodes | | state of subsequent node > 0 (cancelled)
// Find the last valid node from the tail node
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
// Wake up the node if it is not null
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
If you have any IT questions, you can send me a private message in the wechat public account and I will answer IT. You can follow IT by scanning the code