AbstractQueuedSynchronizer
1. Introduction
AbstractQueuedSynchronizer, abstract queue synchronizer, located in Java. Util. Concurrent. The locks package, mainly used to implement a lock or other sync framework. Most synchronizers and locks in JUC packages are implemented using this synchronizer.
- Int state is provided to represent synchronization state, and three methods, getState, setState, and compareAndSetState, are provided to operate on state.
- A FIFO queue is provided as a synchronization queue to manage multithreaded queueing and waiting for notifications (i.e., threads that do not get the synchronization status are queued, blocked and waiting for the precursor node to wake up).
2. Use
AQS is an abstract class and is used primarily in an inherited manner (inner classes are recommended). A custom synchronizer can be implemented by simply implementing state fetch and release (that is, implementing template methods) without worrying about thread wait queues, which AQS already does. The use of AQS only needs to use getState, setState, compareAndSetState these three methods to achieve the definition of the template method. The template method is as follows:
TryAcquire Exclusive Mode - Obtain synchronized resources * tryRelease Exclusive mode - Release synchronized resources * tryAcquireShared Sharing mode - Obtain synchronized resources * tryReleaseShared Sharing mode - Release synchronized resources * IsHeldExclusively Whether the current thread is monopolizing resourcesCopy the code
2.1 ReentrantLock Uses AQS
Next, take a look at how ReentrantLock uses AQS. As you can see, ReentrantLock inherits AQS and overwrites the tryRelease method using the Sync inner class, while Sync subclasses NonfairSync(unfair) and FairSync(fair) overwrite the tryAcquire method, respectively.
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock(a);
// Not fair mode
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//c==0 indicates that the current synchronization status is not obtained
if (c == 0) {
// The CAS obtains the synchronization status
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}/ / reentrant
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Release the synchronization state (not completely, but partially, because there is reentrant), regardless of whether it is fair
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// Check whether the current thread gets exclusive
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
// The release is complete
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// omit non-critical code
}
Copy the code
For example, the lock method in NonfairSync and FairSync, after rewriting the template method, can directly use AQS acquire method to obtain synchronization state, that is, lock.
static final class NonfairSync extends Sync {
/** * Unfair lock * Try cas while lock, if successful get lock. * This is where ReentrantLock is unfair. * /
final void lock(a) {
if (compareAndSetState(0.1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/** * Rewrite the AQS tryAcquire template method */
protected final boolean tryAcquire(int acquires) {
returnnonfairTryAcquire(acquires); }}static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock(a) {
acquire(1);
}
/** * Rewrite the AQS tryAcquire template method */
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// If there is no thread in the queue, try cas
/ /! hasQueuedPredecessors()
if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}/ / reentrant
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; }}Copy the code
2.2 acquire method
Acquire attempts to acquire synchronization status, and if it fails, the current thread is added to the synchronization queue, suspended, and waiting to wake up.
public final void acquire(int arg) {
// Call the tryAcquire method implemented by the subclass
//tryAcquire returns false -> Build Node to queue -> suspend
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
(3),
AQS and its use have been introduced above. Next, let’s look at how AQS is implemented.
3.1 Data structure and member variables
3.1.2 Node class
The Node class of the synchronous wait queue, which builds the thread into a Node class and adds it to the end of the synchronous queue when the thread fails to obtain the synchronization status. Let’s take a look at a few member variables in the Node class:
// The status of the current node in the synchronization queue. The initial value is 0
volatile int waitStatus;
// The precursor node
volatile Node prev;
// Subsequent nodes
volatile Node next;
// The current thread
volatile Thread thread;
// points to the next node in CONDITION
Node nextWaiter;
Copy the code
Enumeration of waitStatus:
// The request to synchronize status has been cancelled
static final int CANCELLED = 1;
// The next Node needs to wake up. When a Node joins the queue, the state of the previous Node is set to SIGNAL
static final int SIGNAL = -1;
The current node is in the Condition queue
static final int CONDITION = -2;
/ / TODO
static final int PROPAGATE = -3;
Copy the code
3.1.3 Member Variables
// Wait for the queue head
private transient volatile Node head;
// Wait for the end of the queue
private transient volatile Node tail;
// Synchronization status
private volatile int state;
Copy the code
FIFO queue in AQS is realized by two member variables of Node type, head and tail. The data structure is shown as follows.
3.2 Exclusive lock implementation
The implementation of an exclusive lock mainly involves the following methods:
AcquireQueued acquireQueued node in the queue The method acquiring a lock shouldParkAfterFailedAcquire judgment in acquiring a lock fails, thread need suspended cancelAcquire cancel acquiring a lockCopy the code
3.2.1 acquire method
The acquire method does the following:
- Call the tryAcquire method implemented by the subclass
- If the synchronization status fails to be obtained, the current thread is constructed as a Node Node and queued
- In the wait queue, it checks whether the current node is the successor of the Head node and tries to acquire the lock. Suspends if fetching fails.
/** * exclusive mode - get lock */
public final void acquire(int arg) {
// Call the tryAcquire method implemented by the subclass
//tryAcquire returns false -> Build Node to queue -> suspend
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
3.2.2 addWaiter method & ENQ Method
After the tryAcquire method returns false, the addWaiter method is called to add a node to the end of the wait queue.
/** * Build the current thread into a Node and join the queue with waitStatus=0 */
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// A quick attempt, that is, to try CAS directly on the tail of the queue
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// If the queue is empty or a quick attempt fails, the enQ method is entered
enq(node);
return node;
}
/** * enqueue * Loops the CAS operation to enqueue the current node */
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// If the header does not exist, initialize it
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// Set the precursor node of the current enqueued node to the original tail node, and update the tail node to the current node
node.prev = t;
if (compareAndSetTail(t, node)) {
// When the successor of the tail node is set to the current queue node
t.next = node;
returnt; }}}}Copy the code
There is one code to note in the addWaiter and ENQ methods, lines 10, 14 and 34, 39. There are three steps to adding endpoints:
- Set the prev of the current node to the previous tail node
- Point the tail pointer to the current node
- Point the next of the previous tail node to the current node
As shown below:Imagine if the order of execution were reversed:
- If you set the prev and next Pointers to the tail node before setting the prev and next Pointers to the tail node, there may come a time when both prev and Next of the tail node are null and the queue is incomplete.
- If you set the next pointer to the tail node, then set the prev pointer to the tail node, then set the CAS tail node, then set the next pointer to the tail node.
In summary, it makes sense to set the prev pointer before if.
3.2.3 acquireQueued method & shouldParkAfterFailedAcquire method
After a node is enqueued, the lock is acquired using the acquireQueued method. The main steps are as follows:
- Verify whether the head node is the precursor node of the current node. If yes, try to obtain the lock and return if the lock is obtained successfully.
- If the precursor of the current node is not a head, or acquiring a lock fails, will enter the shouldParkAfterFailedAcquire method.
ShouldParkAfterFailedAcquire method is the main logic: sets the precursors of the current node node status to SIGNAL, if the precursor nodes in CANCELLED status is skipped. When this method returns true, the parkAndCheckInterrupt method is entered and the current thread is suspended for wakeup (at this point, the precursor node status of the current node is SIGNAL).
/** * The node in the queue is locked by this method */
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// Interrupt flag
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// Check whether the current node's precursor is head(indicating that the current node is eligible to acquire the lock), if head attempts to acquire the lock
// Head is set to the current node
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// Failed to obtain the lock, determine whether to suspend
/ / shouldParkAfterFailedAcquire judge whether the current thread should hang
//parkAndCheckInterrupt suspends the thread and returns whether it is interrupted after waking up
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// If the precursor node is in SIGNAL state, the precursor node wakes up its successor node when it releases the lock, so the current node can block
if (ws == Node.SIGNAL)
return true;
// ws>0 means CANCELLED -> the preceding node is CANCELLED
if (ws > 0) {
// Search forward from the current node's predecessor, find the first node whose state is not CANCELLED, and mount it
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
// If the precursor node is not CANCELLED, set it to SIGNAL
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
3.2.4 cancelAcquire method
The cancelAcquire method is used to cancel the lock on a node. This method is private and is currently only called in finally code where an exception occurred to get the lock.
/** * cancel the lock */
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// Go ahead and skip the cancelled node
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Record it for subsequent CAS use
Node predNext = pred.next;
// Set the wait state of the current node to Cancelled
node.waitStatus = Node.CANCELLED;
// If the current node is tail, and tail is set successfully as the precursor node of the current node
if (node == tail && compareAndSetTail(node, pred)) {
// Set next for the precursor node to NULL
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// The second case:
// Three conditions
// 1. The precursor node is not a head node
// 2. The precursor node status is SIGNAL or the CAS status is SIGNAL succeeded
// 3. The thread of the precursor node is not null
if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
Node next = node.next;
// If the successor node of the current node is not empty and its wait state is not CANCELLED, link the predecessor node with its successor node
// next prev pointer to pred
// Where is this step done? - > other threads in the calling cancelAcquire or skip through a prev pointer when shouldParkAfterFailedAcquire CANCELLED status of nodes
if(next ! =null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
}
// If node is a successor of head, wake up the successor node directly
else {
unparkSuccessor(node);
}
node.next = node; // help GC}}Copy the code
Note how cancelAcquire handles the following three cases differently:
- The current node is tail
- The current node is not tail and is not a successor to head
- The current node is the successor of head
If the current node is tail and tail is set to its precursor successfully, set tail to tail and set next to null.
Second case: The current node is not the successor of tail or head. In this case, the precursor nodes of the current node are connected to the successor nodes. However, the code in the second case above only points the next of the precursor node to the successor node and does not set the prev of the successor node. (The precursor nodes here are those whose CANCELLED state has been skipped) PreV Settings for subsequent nodes, Mainly other threads in the calling cancelAcquire or skip through a prev pointer when shouldParkAfterFailedAcquire CANCELLED status of nodes.
The third case: the current node is the successor of the head this case directly invokes the unparksucceeded method to wake up the successor node.
3.2.5 unparkSuccessor method
The unparksucceeded method wakes up the successors of the current node. If the subsequent node of the current node is null or CANCELLED, the node is searched forward from the tail node to find the last node whose state is not CANCELLED. Why are we going backwards here? As you can see from 3.2.2, the queue entry in the addWaiter and enQ methods is not an atomic operation. So there could be a situation where:The current node is a tail node, but a new node is enqueued at the moment, and the second step described in 3.2.2 has just been executed (cas tail is complete, and the next pointer to the current node is still null), so if you traverse backwards and forwards, you may miss the last newly enqueued node.
/** * Wakes up the successor of the current node */
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// Set the current node state to 0,?
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// Get the successor node of the current node, ready to wake up
Node s = node.next;
// If the subsequent node is empty, or CANCELLED
// where s==null does not mean that S is tail. At this point, a new node may join the queue and the cas is completed but the next pointer is not updated
if (s == null || s.waitStatus > 0) {
s = null;
// Start at tail and walk forward to find the node whose status is not CANCELLED
In addWaiter and enq, joining the queue is not an atomic operation (3 steps: 1 prev pointer,2 tail pointer,3 Next pointer).
// So if you go back and forth, you might miss the node because the next pointer might be null.
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
/ / wake
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
3.2.6 After the thread is woken up
After being woken up, the thread is woken up from the parkAndCheckInterrupt method, returns whether it is interrupted, and then goes back into the loop of the acquireQueued method. The awakened node verifies that its precursor is head, and if head attempts to acquire the lock, that is, call tryAcquire. If it is not the head into the shouldParkAfterFailedAcquire method, skip the status to CANCELLED the node, then continue to enter circulation, is the head of precursor node at this time.
3.2.7 Releasing the Exclusive Lock
/** * release synchronization */
public final boolean release(int arg) {
// Call the template method implemented by the subclass to release synchronization state
if (tryRelease(arg)) {
Null, there is no race, head is not initialized * 2. Head is the node of the current thread * 3. After tryRelease, the node of another thread acquiesces the lock, set by setHead (acquireQueued) */
Node h = head;
// Wake up the successor node
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
3.3 Shared lock implementation
The implementation of shared locks mainly involves the following methods:
AcquireShared sharing mode - acquiring a lock doAcquireShared addWaiter to add the current thread to wait queue setHeadAndPropagate shouldParkAfterFailedAcquire judgment in acquiring a lock fails, Threads need suspended addWaiter method and shouldParkAfterFailedAcquire method above already has detailed notes, don't do too much analysis.Copy the code
3.3.1 acquireShared method
The acquireShared method does the following:
- Call the tryAcquireShared method implemented by the subclass. If the result is >=0, the shared lock is successfully obtained
- If the fetch fails, the doAcquireShared method is called to enter the wait queue
Here’s what the return value of the tryAcquireShared method means:
- Return negative: Failed to obtain the shared lock
- Returns zero: success was achieved, but no free resources are available
- Returns a positive number: the acquisition was successful and there are free resources available to wake up
/** * Share mode - get lock */
public final void acquireShared(int arg) {
// Call the template method implemented by the subclass. If <0 is returned, the lock fails to be obtained, and the doAcquireShared method needs to be called for waiting queue processing
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
Copy the code
3.3.2 rainfall distribution on 10-12 doAcquireShared method
The doAcquireShared method does the following:
- Adds the current node to the wait queue
- Try to acquire the lock and determine if it should be suspended (this logic is similar to the acquireQueued method for an exclusive lock)
The main difference from the exclusive lock is that the setHeadAndPropagate is called after the successful acquisition of the shared lock.
private void doAcquireShared(int arg) {
// Add the current thread to the queue
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for(; ;) {final Node p = node.predecessor();
// The precursor node is head, trying to acquire the lock
if (p == head) {
int r = tryAcquireShared(arg);
// Succeeded
if (r >= 0) {
// Set the header and wake up the successor nodes
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return; }}// Same exclusive lock
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if (failed)
// Same exclusive lockcancelAcquire(node); }}Copy the code
3.3.3 Sethead propagate method and doReleaseShared method
The main things that the Propagate method does are:
- Set the current thread node to head
- Determine if you need to wake up the successor node and invoke the doReleaseShared method
If you need to wake up a successor node, the doReleaseShared method is called, which is called primarily in two places:
- The doReleaseShared method is called using releaseShared when a thread successfully releases the shared lock
- The subsequent thread of the head calls the doReleaseShared method through the setHeadAndPropagate method when it succeeds in acquiring the shared lock (either before blocking or after blocking).
If the head state is SIGNAL, wake up the successor node. If the state of head is 0 at the moment, it means that the subsequent node does not need to wake up for the moment. Set it to PROPAGATE to ensure the future transmission.
/ * * *@paramNode Successfully obtained the shared lock *@paramPropagate the value >0 indicates that tryAcquireShared is successful and the resources are still available, that is, subsequent threads can also obtain the shared lock. =0 indicates success, but subsequent threads fail */
private void setHeadAndPropagate(Node node, int propagate) {
/ / the old head
Node h = head; // Record old head for check below
setHead(node);
/** * propagate > 0 -> Indicates that a subsequent thread can also obtain the shared lock
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null|| s.isShared()) doReleaseShared(); }}/** ** Propagation wakes up in shared mode */
private void doReleaseShared(a) {
for(; ;) {// If it is the call of the setHeadAndPropagate method, the header is the node that just got the shared lock
Node h = head;
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
// If head is in SIGNAL state, try setting its state to 0. If set successfully, wake up the successor node
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// If the state of head is 0(which means that the successor node does not need to wake up for the time being), try to set its state to PROPAGATE for later delivery
// this should correspond to h.waitStatus < 0 in the propagate method
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// If the header does not change, the setting is complete and the loop exits
// If the header changes, for example, another thread has acquired the lock, it must retry in order for its wake up action to be transmitted
if (h == head) // loop if head changed
break; }}Copy the code
The main attention needs to be paid to the if judgment logic in the propagate method:
- Propagate > 0 ** : indicates that there is also a shared resource that can be obtained, so the subsequent node needs to be woken up
- h == null || h.waitStatus < 0: The head is an old head, not null (unless gc is performed), and the head state may be 0 or -3
- The head state is set from -1 to 0 when the current node is unlocked and awakened by another thread. Or if the current thread acquired the lock before blocking, the head state may not have changed from 0 to -1
- In the case of -3, the current node is awakened but has not yet set itself as head, the old head state is 0, another thread releases the resource, calls doReleaseShared, reads the old head state is 0, and changes 0 to -3
- (h = head) = = null | | h.w. aitStatus < 0: The current head is the new head. The possible value is 0, -1, or -3
- State 0: There are no successor nodes, or the successor node has just joined the queue and is not yet blocked (that is, the current thread has not changed its state to -1)
- In the -1 state, the current node has a successor node and its state has been changed to -1
- In the case of -3, another thread releases the lock, calls doReleaseShared, reads the new head state of 0, and changes 0 to -3
Why is propagate not greater than 0 here, still need to wake up the next node according to waitStatus<0? PROPAGATE state appears at the beginning to solve the problem of being hung that may occur when semaphore concurrent release, see the BUG, the recurrence code is shown as follows.
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run(a) { sem.acquireUninterruptibly(); }}private static class Thread2 extends Thread {
@Override
public void run(a) { sem.release(); }}public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = newThread2(); t1.start(); t2.start(); t3.start(); t4.start(); t1.join(); t2.join(); t3.join(); t4.join(); System.out.println(i); }}}Copy the code
AQS has no PROPAGATE at the beginning of the code, as shown below:
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0&& node.waitStatus ! =0) {
Node s = node.next;
if (s == null|| s.isShared()) unparkSuccessor(node); }}Copy the code
So consider this situation:
- At some point t1 and T2 are queuing, T3 releases resources, wakes up the head’s successor T1, and changes the head state from -1 -> 0, and T1 succeeds in obtaining resources, but at this point the tryAcquireShared method returns 0
- At this point T4 frees resources, reads head is 0 (this is still the old head) and does not wake up the successor node
- T1 executes to the setHeadAndPropagate method, finds that there are no resources (Propagate=0), and does not wake up the successor node
- And then no one wakes up T2, and it’s stuck
After introducing PROPAGATE state, how is the problem solved:
- At some point t1 and T2 are queuing, T3 releases resources to wake up head’s successor T1, and changes the head state from -1 to 0. T1 successfully obtains resources, but the tryAcquireShared method returns 0
- At this point T4 releases the resource, reads that head is 0 (at the moment it’s still the old head), sets the head state to PROPAGATE, that is -3, and encapsulates the code in the doReleaseShared method
- Thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread = thread The doReleaseShared method retrieves the new head (set to T1) with the head status -1 and wakes up the successor node T2
3.3.4 Releasing the Share Lock
ReleaseShared does the following:
- Call the tryReleaseShared implementation of the subclass to release the synchronization state
- Call the doReleaseShared method to wake up the head’s successor node
Note that the release logic of ReentrantReadWriteLock is different from that of Semaphore. ReentrantReadWriteLock returns true only when all ReentrantReadWriteLock is released. Semaphore returns true only when it is released.
/** * Share lock release */
public final boolean releaseShared(int arg) {
// Call tryReleaseShared to release synchronization state
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
Other
code
reference
www.cnblogs.com/lfls/p/7599… Blog.csdn.net/weixin_3658…