1. Talk about something else first
To be honest, I had planned to write a technical article about the design concept, implementation and use of AQS, but after writing the first draft, I found my grasp was still vague and ambiguous. Let go of the pain and start all over again. This time with Java 8 source code as the basis for interpretation.
2. Introduction to AQS
In Java. Util. Concurrent. The locks package, there are two such classes:
- AbstractQueuedSynchronizer
- AbstractQueuedLongSynchronizer
The only difference between these two classes is:
- AbstractQueuedSynchronizer internal maintenance
state
A variable isint
type - AbstractQueuedLongSynchronizer internal maintenance
state
A variable islong
type
What we often call AQS refers to these two classes, abstract queue synchronizers.
Abstract queue synchronizer AbstractQueuedSynchronizer (hereinafter referred to as “AQS), is used to construct the lock or other skeleton of synchronous components, reduce the amount of code, the functional component implementations are solved in the realization of synchronizer involves a lot of the details, such as waiting thread using FIFO queue operations order. There are also flexible criteria that can be defined in different synchronizers to determine whether a thread should pass or wait.
AQS adopts the template method pattern, on the basis of internally maintaining n more template methods, subclasses only need to implement specific methods (not abstract methods! Not abstract methods! Not abstract methods! , you can implement the subclass’s own requirements.
Aqs-based components such as:
- ReentrantLock ReentrantLock (fair and unfair lock acquisition)
- Semaphore counts semaphores
- ReentrantReadWriteLock read-write lock
- …
AQS is one of Doug Lea’s great books, looking him up on Wikipedia, and accidentally discovering that pops likes red or light pink shirts?
3. Design ideas of AQS
AQS internally maintains an INT member variable to indicate the synchronization status, and controls the thread acquiring the shared resource through the built-in FIRST-in-first-out (FIFO) synchronization queue.
We can guess that AQS does a few things:
- Maintenance management of synchronization state
- Maintenance and management of wait queues
- Thread blocking and wake up
ConditionObject, of course, maintains a ConditionObject inner class, which is used for collaboration and communication between threads.
The int state maintained internally by AQS can be used to indicate any state!
- ReentrantLock is used to indicate the number of times that the lock holder thread has repeatedly acquired the lock. For non-lock holders, if state is greater than 0, it means that the lock cannot be acquired, and the thread is wrapped as Node and added to the synchronous wait queue.
- Semaphore uses this to indicate the number of remaining licenses. When the number of licenses is zero, threads that have not received licenses but are trying to obtain them will enter a synchronous wait queue, block until some threads release their licenses (state+1), and then compete for the freed licenses.
- FutureTask uses this to indicate the status of a task (not started, running, completed, cancelled).
- When used by ReentrantReadWriteLock, state is a bit different. The first 16 bits of state are read locks, and the second 16 bits are write locks.
- CountDownLatch uses state to indicate the number of counts. If state is greater than 0, it indicates that it needs to be added to the synchronous wait queue and blocked until state is equal to 0, the threads in the wait queue will be awakened one by one.
3.1 Pseudo-code lock acquisition:
boolean acquire(a) throws InterruptedException {
while(Current state does not allow fetch operation) {
if(need to block fetch request) {
If the current thread is not in the queue, it is inserted into the queue
Blocking the current thread
}
else
Returns the failure
}
It is possible to update the state of the synchronizer
If a thread is in a queue, it is removed from the queue
Return to success
}
Copy the code
3.2 Pseudo-code lock release:
void release(a) {
Updates the status of the synchronizer
if(The new state allows a blocked thread to succeed.)
Unblock one or more threads in a queue
}
Copy the code
This is probably the idea.
3.3 Methods provided
3.3.1 Common method
The following three methods, all protected final, can be called by any class that inherits AQS.
- Protected final int getState() gets the synchronization state
- Protected final void setState(int newState) Sets the synchronization state
- Protected Final Boolean compareAndSetState(int expect, int update) If the current state value is equal to the expected value, atomically set the synchronization state to the given update value and return true; Otherwise return false
3.3.2 Methods that subclasses need to implement
The following five methods are not implemented within AQS, but are handed over to subclasses to implement, and then AQS calls the implementation methods of subclasses to complete the logical processing.
- Protected Boolean tryAcquire(int) If tryAcquire(int) attempts to acquire an operation in exclusive mode, query the state of the object to see if it is allowed to acquire it in exclusive mode, and obtain it if so.
- Protected Boolean tryRelease(int) Attempts to release the synchronization state
- Protected int tryAcquireShared(int) shared to try to get the action
- Protected Boolean tryReleaseShared(int) Try to release the shared mode
- Protected Boolean isHeldExclusively() specifies whether the thread calling this method is the holder of an exclusive lock
Subclasses do not need to implement all of the above methods. They can override some of them, but keep the implementation logic intact. According to different implementation methods, it can be divided into exclusive lock policy implementation and shared lock policy implementation.
This is why the above methods are not defined as abstract methods. If defined as abstract methods, subclasses must implement all five methods, even if you don’t use them at all.
Exclusive locks:
- ReentrantLock
- ReentrantReadWriteLock. WriteLock implementation strategy:
- tryAcquire(int)
- tryRelease(int)
- isHeldExclusively()
A Shared lock:
- CountDownLatch
- ReentrantReadWriteLock.ReadLock
- Semaphore implementation strategy:
- tryAcquireShared(int)
- tryReleaseShared(int)
AQS has a lot of internal template methods, not an example, after the source code interpretation, will show a part of, and will be with SAO gas notes.
4. Internal attributes of AQS
4.1 the CLH queue
AQS controls the threads that acquire shared resources through a built-in FIRST-in-first-out (FIFO) synchronization queue. CLH queue is a bi-directional queue of FIFO, and the synchronization mechanism of AQS is based on this CLH queue. For each node in the queue, there are Pointers to the precursor node and Pointers to the successor node.
The head is not in the blocking queue!
The Node source code:
static final class Node {
// Wait for tags in shared mode
static final Node SHARED = new Node();
// Wait for tags in exclusive mode
static final Node EXCLUSIVE = null;
// Indicates that the current thread is cancelled
static final int CANCELLED = 1;
// Indicates that the current node's successors contain threads that need to be run, i.e., unpark
static final int SIGNAL = -1;
// Indicates that the current node is waiting on condition, i.e. in the condition queue
static final int CONDITION = -2;
// Indicates that subsequent acquireShared in the current scene can be executed
static final int PROPAGATE = -3;
/ * *
* CANCELLED = 1 // Current thread CANCELLED due to timeout or interruption. This is a terminal state, which means we're done here.
* SIGNAL = -1 // Indicates that the successor thread of the current thread is blocked or about to be blocked and needs to wake up after the current thread releases or cancels the lock. This state is usually used by the successor node to set the precursor node
CONDITION = -2 // indicates that the current thread is in the CONDITION queue
PROPAGATE = -3 // For passing on the wake up successor thread, this state is introduced to perfect and enhance the wake up mechanism for shared locks
* 0 // indicates no state or terminal state!
* /
volatile int waitStatus;
// The precursor node
volatile Node prev;
// Subsequent nodes
volatile Node next;
// The thread of the current node, initialized to use, deactivated after use
volatile Thread thread;
// Stores successor nodes in the condition queue
Node nextWaiter;
// Returns true if the node is waiting in shared mode
final boolean isShared(a) {
return nextWaiter == SHARED;
}
// Return the precursor node of the current node, if null, throw a null pointer exception
final Node predecessor(a) throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// Specify thread and pattern constructors
Node(Thread thread, Node mode) { // Used by addWaiter
// SHARED and EXCLUSIVE are used to indicate whether a node is SHARED or EXCLUSIVE
this.nextWaiter = mode;
this.thread = thread;
}
// Specify thread and node state constructors
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Copy the code
4.2 volatile state
Most important property, this integer can be used to represent any state! It says that up there.
4.2 volatile head & volatile tail
The head node is a virtual node that logically represents the thread node that holds the lock. The head node does not store thread information or precursor node information.
Tail Tail node, where each new node goes to the end of the queue. No information about successor nodes is stored.
- These two attributes are lazily initialized. When the first thread holds the lock and the second thread fails to acquire the lock, the head and tail will be initialized. That is, when all threads can acquire the lock, the internal head and tail are null. Even if no thread later holds the lock, its internal head and tail still hold the last thread node to hold the lock! (Head and tail both point to a memory address)
- When a thread fails to acquire the lock and is added to the synchronization queue, CAS is used to set tail to the Node corresponding to the current thread.
- The Unsafe class was removed for cas operations within AQS, and since Java9, the Unsafe class has been replaced by the VarHandle class.
Both of these attributes are volatile (which guarantees order and visibility)
4.3 spinForTimeoutThreshold
The spin timeout threshold is used in methods like doAcquireSharedNanos().
- If the user-defined wait time exceeds this threshold, the thread will block, returning true if it can wait for a wake up opportunity and tryAcquireShared succeeds during the block, false otherwise, and false if it timed out.
- If the user-defined wait time is less than or equal to this threshold, the loop is infinite and the thread does not block until a thread releases synchronization or times out, and then returns the corresponding result.
4.4 exclusiveOwnerThread
This is AQS through inheritance AbstractOwnableSynchronizer class, obtain the properties of the synchronizer holders said monopolistic mode.
5. Specific implementation of AQS
5.1 Implementation of exclusive Lock
5.1.1 Obtaining the lock reentrantLock. lock()
/ * *
* Get exclusive lock, ignore interrupt.
* First attempt to acquire the lock, return true if successful; Otherwise, the current thread is wrapped as a Node and inserted at the end of the queue. In the queue, the thread is checked to see if it is the direct successor of the head and attempts to acquire the lock.
* If the lock fails, the current thread is blocked through LockSupport until the thread that released the lock wakes up or is interrupted, then attempts to acquire the lock again, and so on. Wake up and resume the previous code execution
* /
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
---------------------------------------------------------------------------------------
The tryAcquire() method needs to be implemented by subclasses, and ReentrantLock implements fair and unfair locking by overwriting this method
---------------------------------------------------------------------------------------
/ * *
* Insert node in synchronous wait queue
* /
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// Check whether the tail node is null
if(pred ! =null) {
node.prev = pred;
// Insert the current node at the end of the queue through CAS
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// If the tail node is null, the new node is inserted at the end of the queue and initialized if necessary
enq(node);
return node;
}
/ * *
* Returns after successfully inserting a node into the queue through an infinite loop and CAS operation.
* Insert nodes into queues and initialize them if necessary
* /
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// Initialize head and tail
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
/ *
CAS Set tail to node
Connect next from tail to node.
If the synchronization queue head and tail nodes have just been initialized by this thread, the next of the head is actually connected to Node, and the old tail node is outlawed by Node.
Head <-> old_tail <-> tail <-> tail
* /
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/ * *
* Nodes in the queue acquire locks by this method, ignoring interrupts.
* This method is very important. If the lock is not obtained above, wrap the thread as a Node to the end of the synchronization queue, and then read the comments in the code
* /
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/ *
* Check whether the current node precursor is head, which is to try to acquire the lock.
* If so, tryAcquire is called to try to acquire the lock,
* If successful, head is set to the current node. Next of the original head node is set to null for GC garbage collection
* /
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/ *
* If the lock is not successfully acquired, it is determined whether to block based on the precursor node.
* Set the interrupt flag to true if the blocking process is interrupted.
* shouldParkAfterFailedAcquire method under the condition of the precursor state not for SIGNAL cycle retry acquiring a lock.
* if shouldParkAfterFailedAcquire returns true, then the current thread will be blocked and check whether is interrupted
* /
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/ * *
* Determine whether the current thread needs to be blocked based on waitStatus in the precursor node.
* /
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/ *
* The precursor node is set to SIGNAL, which wakes up the successor node when the lock is released.
* So the successor node (that is, the current node) can now block itself.
* /
return true;
if (ws > 0) {
/ *
* The precursor node status is cancelled, traverses forward, updates the precursor of the current node to the previous non-cancelled node.
* The current thread will then return to the loop again and try to acquire the lock.
* /
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/ * *
For PROPAGATE(-3), set the wait state of the precursor to SIGNAL,
* and then goes back to the loop and tries again to get the lock.
* /
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/ * *
* This method unlocks a node.
* /
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// Iterate over and update the node precursor, pointing the node prev to the first non-canceled node in front.
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Record that the pred node is followed by predNext, which will be used later by CAS.
Node predNext = pred.next;
CancelAcquire can be skipped by subsequent nodes when cancelAcquire is called
node.waitStatus = Node.CANCELLED;
// If the current node is the tail node, set the tail node to the precursor node of the current node
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If the node has a successor node, what you need to do in this case is to combine the pred with the successor non-cancelable node.
int ws;
if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
Node next = node.next;
/ *
* CAS is used to try to set pred's successor to Node's successor if next is not cancelled
* It does not matter if the if condition is false or the CAS fails. This means that multiple threads may be aborting and one of them will succeed.
* /
if(next ! =null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
/ *
* At the GC level, and setting to NULL has the same effect
* /
node.next = node;
}
}
Copy the code
The process of acquiring an exclusive lock is roughly as follows: Assuming that the lock has been held by thread A for A long enough time, thread B and thread C fail to acquire the lock.
Thread B:
- 1. Wrap thread B into Node Node (BN for short) and join the synchronous wait queue. At this time, waitStatus of BN =0
- 2. Set the tail node to BN and connect it to the head node to form a linked list
- 3. The head node is a virtual node, that is, the thread that holds the lock (but does not contain thread information), and the tail node is BN
- 4. Thread B enters an “infinite loop”, checks whether the first node is the head node (true) and tries again to acquire the lock (false, failed to acquire the lock)
- 5, thread B will enter shouldParkAfterFailedAcquire method, inside the method, the BN precursors node (that is, the head node) waitStatus set to 1, this method returns false
- 6, because it is an infinite loop, so the thread B again into shouldParkAfterFailedAcquire method, as a result of the BN precursors node (that is, the head node) waitStatus to 1, so direct return true
- Call parkAndCheckInterrupt and the current thread B is blocked waiting to wake up.
Thread C:
- 1. Wrap thread C into Node Node (CN for short) and join the synchronous waiting queue. At this time, waitStatus of CN =0
- 2. Set the tail node to CN and connect it to the original tail node (BN node)
- 3, thread C enters “infinite loop”, check whether the first node is the head node (false)
- 4, thread C will enter shouldParkAfterFailedAcquire method, inside the method, the CN precursor node node (BN) waitStatus is set to 1, this method returns false
- 5, because is an infinite loop, so the thread C enter shouldParkAfterFailedAcquire method again, as a result of CN precursor node node (BN) waitStatus to 1, so direct return true
- Call parkAndCheckInterrupt and thread C is blocked waiting to wake up.
The final queue looks like this:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | BN | | tail |
| AN | ---> | | ---> | (CN) |
+------+ +------+ +------+
Copy the code
5.1.2 Unlocking ReentrantLock.unlock()
To release an exclusive lock, the tryRelaes(int) method is called. This method is implemented by subclasses. After the lock is fully released, the thread that released the lock wakes up its successor, which contests the lock (unfair lock).
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// The header is not null and subsequent nodes need to be woken up
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
The execution of releasing an exclusive lock is roughly as follows (assuming there are successor nodes to wake up) :
- Add the head node
waitStatus
Set to zero - Wake up the successor node
- When the thread of the successor node is awakened, the successor node is set to head and the prev and Thread properties within the successor node are set to NULL
- Set the next pointer to null for the original head node and wait for GC to reclaim the original head node.
+------+ +------+ +------+
| old | <-X- | new | <--- | |
| head | | head | | tail |
| AN | -X-> | BN | ---> | (CN) |
+------+ +------+ +------+
Copy the code
As shown above, the AN node (formerly the head node) is waiting to be collected by the GC garbage.
5.2 Shared Lock Implementation Roadmap
5.2.1 acquiring a lock
Unlike acquiring an exclusive lock, the key is that a shared lock can be held by multiple threads.
If AQS is required to implement shared locks, when implementing the tryAcquireShared() method:
- Return a negative number, indicating that the fetch failed
- Returns 0, indicating success, but subsequent contended threads will not succeed
- Returns a positive number, indicating success, indicating that subsequent contended threads may also succeed
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// Once the share succeeds, set up a new header and wake up subsequent threads
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/ * *
* This function does two things:
* 1. After obtaining the share lock, set the head node
* 2. Use the state returned by the call to tryAcquireShared and the wait state of the node itself to determine whether to wake up subsequent threads
* /
private void setHeadAndPropagate(Node node, int propagate) {
// Enclose the current head on the method stack for the following condition check
Node h = head;
setHead(node);
/ *
Propagate is the return value for tryAcquireShared, which is one of the criteria for deciding whether to propagate the wake up
* H. waitStatus for SIGNAL or PROPAGATE also depends on the next node share of the node
* /
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/ * *
* This is the core wake up function in the shared lock. The main thing to do is wake up the next thread or set propagation state.
* After the subsequent thread is woken up, it will try to acquire the shared lock. If successful, it will call setHeadAndPropagate again to propagate the wake up.
* This function is used to ensure that a waiting node in the queue can be awakened in case of a acquire and release contention.
* /
private void doReleaseShared(a) {
/ *
* What the following loop does is wake up a successor thread if there are any on the queue;
* If the wait state of the head node is 0,
The node state is set to PROPAGATE although it cannot be unparksucceeded, in order to ensure that the wake can be successfully and solidly passed on.
In this way, the thread that gets the lock can read the PROPAGATE when executing the setHeadAndPropagate, so that the thread that gets the lock can release the subsequent waiting thread.
* /
for (;;) {
Node h = head;
// If there is a successor thread in the queue.
if(h ! =null&& h ! = tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue;
unparkSuccessor(h);
}
// If the state of the h node is 0, it needs to be set to PROPAGATE for the propagation of the wake up.
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// Check if h is still head. If not, loop again.
if (h == head)
break;
}
}
Copy the code
5.2.1 releases the lock
DoReleaseShared (int) is used for both releasing and acquiring shared locks
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// The implementation of doReleaseShared for obtaining shared locks is described above
doReleaseShared();
return true;
}
return false;
}
Copy the code
I think everyone should be able to understand, but let’s briefly say (manual dog ~) :
In a synchronous wait queue, after waking up the successor node threads that blocked because they failed to acquire the shared lock, the successor node threads in turn wake up their successor nodes! And so on.
Another way to say it?
This may be the case where a write lock causes some of the threads that acquired the read lock to block. When the write lock is released, the subsequent node thread will wake up the subsequent node thread. If the subsequent node is blocked because of the failure to acquire the read lock, the subsequent node thread will wake up the subsequent node thread. Until the read lock is obtained for all nodes or the write lock is obtained for a node.
6, expand
PROPAGATE has to be said
There is a bug about AQS that is really worth checking out
In the shared lock acquisition and release operation, there is one particular waitStatus value that I think is very important to talk about, which is PROPAGATE. The attribute value PROPAGATE means that it is used to pass on the wake up subsequent thread. This state is introduced to perfect and enhance the wake up mechanism of the shared lock.
I have read a lot of articles about AQS before, but there is very little about this state value, even in the book “Java Concurrent Programming Practice”, it was not mentioned. Finally, I saw a blogger who explained this PEOPAGATE state in detail, which also gave me a lot of inspiration.
Yes, when I first looked at the AQS source code, I even ignored the PROPAGATE state value directly. In fact, not just read the source code, easy to ignore the PROPAGATE state values, even Doug Lea he himself, also didn’t realize when development, if not the status value will lead to what kind of consequences, until the bug link above, he just added this state, completely repair the bug.
Code to reproduce this bug:
import java.util.concurrent.Semaphore;
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run(a) {
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run(a) {
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
Copy the code
An occasional thread hangs while the program is executing.
Let’s look at what the propagate method looks like earlier.
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0&& node.waitStatus ! =0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
Copy the code
Semaphore.release() then calls AQS releaseShared.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
Take a look at Node back then:
static final class Node {
// Ignore extraneous code and display only the status value of waitStatus
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
}
Copy the code
The sethead propagate method and releaseShared method are also very simple in design.
The waitStatus of a Node does not PROPAGATE=-3.
I have also, for your convenience, shown the source code of the then unparksucceeded methods:
private void unparkSuccessor(Node node) {
// Set node waitStatus to 0
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
Next, let’s talk slowly ~
Ps: Seriously, the boss is not far from my position, although my work has been completed many days in advance, but, still a little panic ~, take the risk to continue to write!
In the operation of AQS to acquire the shared lock, the thread that enters the synchronous wait (blocked) can be woken up in two ways:
- The other threads, after releasing the semaphore, call the unparkprecursor (
releaseShared
Method) - After other threads successfully acquire the shared lock, they wake up the subsequent nodes through propagation mechanism (i.e
setHeadAndPropagate
Method).
An example of a recurring bug is simply instantiating four threads repeatedly in a loop, with the first two threads fetching the semaphore, the second releasing the semaphore, and the main thread waiting for all four threads to execute before printing.
When the last two threads do not release the semaphore, the synchronous wait queue inside the AQS looks like this:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | t1 | | t2 |
| | ---> | | ---> | |
+------+ +------+ +------+
Copy the code
- 1, T3 release semaphore, call
releaseShared
At the same time, the head waitStatus changes to 0 - 2, t1 is woken up, call Semaphore.NonfairSync’s tryAcquireShared method and return 0
- T4 release semaphore and call
releaseShared
In thereleaseShared
The head read from the head method is the same as the original head, but the head waitStatus has changed to 0, so it will not be calledunparkSuccessor
methods - T1 was woken up and will not be called because the Semaphore.NonfairSync tryAcquireShared method in Step 2 returned 0
unparkSuccessor
methods
At this point, both paths are blocked and there are no threads left to wake up T2, the thread is hung…
Ps: Doug Lea black question mark face, haha ~
To fix this bug, Pops made the following improvements:
- 1. Add a waitStatus state, i.e
PROPAGATE
- 2, in
releaseShared
Extracted from the methoddoReleaseShared()
It is shown abovedoReleaseShared
In the method, if the state of the head node is 0, it needs to be set to PROPAGATE to ensure the propagation of the wake up. - 3, in
setHeadAndPropagate
There are also some more judgments in the method, where the waitStatus of the head node is less than 0, the subsequent node is woken up (PROPAGATE = -3).
With the improved code, let’s go over it again:
- 1, T3 release semaphore, call
releaseShared
At the same time, the head waitStatus changes to 0 - 2, t1 is woken up, call Semaphore.NonfairSync’s tryAcquireShared method and return 0
- 3. This step occurs at the same time as step 2, t4 releases the semaphore, called
releaseShared
In thedoReleaseShared
The head read in the method is the same as the original head, but now the waitStatus of the head has changed to 0, set the waitStatus of the head to PROPAGATE (-3). - T1 is woken up and called
setHeadAndPropagate
Method, set T1 as head, meet the criteria, enter the branch statement, calldoReleaseShared
Method, which then wakes up the T2 node thread.
6.2 A few thoughts on the unparksucceeded
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/ *
* In general, the thread to wake up is the successor thread of the current node
* However, if the successor of the current node is cancelled, it traverses forward from the end of the queue until it finds the successor that has not been cancelled
* /
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
In the unparksucceeded method, if the successor of the current node is cancelled, one goes forward from the end of the queue until one that has not been cancelled is found.
Why do I start at the tail node and walk forward?
Suppose the CLH queue looks like the following figure:
+------+ +------+ +------+
| | <--- | | <--- | |
| head | | t1 | | tail |
| | ---> | | ---> | |
+------+ +------+ +------+
Copy the code
T1.waitstatus = 1 and tail.waitStatus = 1
Head tries to wake up the successor node T1, and finds that T1 is in the cancelled state. Then find tail, which is also in the cancelled state, but tail. Next == null.
At the same time, a new node is added to the end of the queue, but the old tail.next has not been pointed to the new node.
That is, if tail.next happens to be halfway between steps 1 and 2, the traversal will break.
Excerpt the addWaiter section of the code:
node.prev = pred;
//Insert the current node at the end of the queue through CAS
if (compareAndSetTail(pred, node)) { //steps1
pred.next = node; //steps2
return node;
}
Copy the code
6.3 Why tryAcquire again in acquireQueued?
In exclusive mode, here’s how I think about it:
Time 1: Thread B tries to acquire the lock, but since the lock is held by thread A, thread B is ready to call addWaiter to queue itself (but without A pointer connection to the head node).
Point 1: At the same time, thread A tries to release the lock, goes into the release method, calls the tryRelease() of the subclass, sets the state of the number of locks held to 0 (indicating that the lock was not held by any thread), goes into the unparksucceeded method, finds that there are no successor nodes (because the new one is not yet in the team), So it doesn’t wake up any threads, so at this point, thread A is done releasing the lock.
Time 2: Thread B has finished calling the addWaiter method, has joined the queue, and has a pointer connection to the head node
Time 3: Thread B calls the acquireQueued method (shown in the code below). If tryAcquire is not called in this method, it will happen that the lock can be acquired, but the thread is asleep, making the entire synchronization queue unavailable
So, tryAcquire is called again in case the whole synchronization queue crashes because the head node has released the lock before the new node is enqueued.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// The new node has not joined the queue, but the head node has released the lock, causing the whole synchronization queue to break down
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Copy the code
The end of the
Through reading AQS source code, it is of great help for us to learn and master components based on AQS.
Especially its design concept and thought, is the focus of our study!
Doug Lea’s AQS paper, good English friends, might as well read