The main reference to the blog JUC framework source code analysis series article directory JDK8
AbstractQueuedSynchronizer
An overview of the
Implement a heavily dependent optimistic locking approach (i.e. CAS+ spin). It implements a FIFO wait queue for waiting to obtain synchronization state, and the function to obtain/release synchronizer state depends on the subclass implementation.
Although AQS is an abstract class, it does not have any abstract methods. It is not appropriate to define abstract methods because inheritances using AQS do not necessarily use all of the functionality provided by AQS (exclusive and shared locks), so subclasses need to implement all of the abstract methods instead. If you define an empty implementation as a normal method, you don’t need to subclass all empty methods, but that’s not clear. Now AQS to the implementation of these methods to throw an UnsupportedOperationException exception, so if it is a subclass need to use the method of overwriting it; If it is a method that a subclass does not need to use, it will throw an exception when called.
AQS defines two resource sharing modes: Exclusive (which can be executed by only one thread, such as ReentrantLock) and Share (which can be executed by multiple threads at the same time, such as Semaphore/CountDownLatch).
Simple use case (implementing a shared lock) :
The implementation of a custom synchronizer is mainly implemented in the following ways:
- IsHeldExclusively () : Whether the thread is monopolizing resources. You only need to implement it if you use condition.
- TryAcquire (int) : Exclusive mode. Attempts to obtain the resource return true on success and false on failure.
- TryRelease (int) : Exclusive mode. Attempts to free resources return true on success and false on failure.
- TryAcquireShared (int) : Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.
- TryReleaseShared (int) : share mode. Attempts to free the resource, returning true if subsequent wait nodes are allowed to wake up after release, false otherwise.
public class MyLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) throw new IllegalArgumentException();
setState(count);
}
@Override
protected int tryAcquireShared(int acquireCount) {
while (true) {
int cur = getState();
int newCount = cur - acquireCount;
if (newCount < 0 || compareAndSetState(cur, newCount)) {
returnnewCount; }}}@Override
protected boolean tryReleaseShared(int releaseCount) {
while (true) {
int cur = getState();
int newCount = cur + releaseCount;
if (compareAndSetState(cur, newCount)) {
return true; }}}}@Override
public void lock(a) {
sync.acquireShared(1);
}
@Override
public void unlock(a) {
sync.releaseShared(1); }}Copy the code
Important attributes
state
- State is volatile to ensure that it is visible.
- Use compareAndSetState to manipulate state if multiple threads are making concurrent changes
- If you are operating on state in a non-thread-safe environment (for example, ReentrantLock releases the lock because it has previously acquired an exclusive lock, so CAS is not necessary), use the setState method
private volatile int state;
protected final int getState(a) {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code
Waiting queue
AQS has implemented a FIFO wait queue for us, which is a bidirectional linked list. Since the state of the synchronizer cannot be obtained by all threads at the same time, the threads that need to wait are queued up as a node. When the condition for obtaining state is met, the thread in the node is awakened so that it can attempt to obtain state next.
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // Node represents the state of the thread
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter; // Indicates whether the current node thread wants to acquire the shared or exclusive lock
final boolean isShared(a) {
returnnextWaiter == SHARED; }}private transient volatile Node head; // A dummy node is fixed because its thread member is fixed to null
private transient volatile Node tail; // The thread that failed to request the lock is wrapped as node and placed at the end of the queue
Copy the code
- The thread member in the head node is null, which can be interpreted as putting its thread member into the AQS
exclusiveOwnerThread
On the properties
- Even if there is only one waiting thread, the number of nodes in the waiting queue must be two, since the first node is always dummy node.
acquire(int arg)
process
- Call the subclass’s first
tryAcquire
Try to acquire the exclusive lock once. Try means try only once, either successfully or failed. - If not, call
addWaiter(Node.EXCLUSIVE)
The thread is added to the end of the wait queue and marked as in exclusive mode acquireQueued
Make a thread wait in a queue for a resource, possibly going through a blocking/waking state until it gets the resource and then returns. Returns true if it was interrupted during the entire wait, false otherwise.- If a thread has been interrupted while waiting, it does not respond. But when
acquireQueued
Returns true, indicating that the interrupted state has been detected and consumed by the function (Thread.interrupted()
), so need in exitacquire
Before, reset the interrupt state on.
/** Acquires in exclusive mode, ignoring interrupts. Implemented by invoking at least once tryAcquire, returning on success. Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success. */
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }/ * * the default implementation of this method is to throw an UnsupportedOperationException, concrete implementation by custom extensions of AQS synchronization classes. AQS here only defines a common method framework. It is not defined abstract because in exclusive mode only tryacquire-TryRelease is implemented and in shared mode only tryAcquireshared-TryReleaseshared is implemented. If both are defined as abstract, then each schema also implements interfaces in the other schema. * /
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
static void selfInterrupt(a) {
Thread.currentThread().interrupt();
}
Copy the code
addWaiter(Node)
- Encapsulate the current thread as a node (
Node.EXCLUSIVE
Mutually exclusive mode,Node.SHARED
Shared mode) - Try to join the queue quickly: Join the end of the queue with one CAS.
- If the CAS fails or the queue is empty, a wait queue is initialized using the ENQ (Node) method
- In ENq (node), if the queue is empty, an empty node is assigned to the head:
compareAndSetHead(new Node())
And then spins until the node joins the end of the queue. This loop is only available whencompareAndSetTail(t, node)
The loop exits on success, which ensures that ENQ will eventually put the node parameter at the end of the queue. ** Even if only one thread is enqueued, the queue will have two nodes. The first node is called dummy node because its thread member is null. The second node is the actual queue head, and its thread member is not null. ** The new node is empty, and all its members are default values. The thread member is null and waitStatus is 0. Later you will notice that the waitStatus of the end of the queue is always 0 because it is initialized by default. - Returns the node where the current thread resides
Pay attention to the point
Multithreaded execution can result in multiple node.prev links to tail, but CAS ensures that tail.next links to only one of the nodes, and that the other nodes eventually join the queue after the constant spinning
Prev availability: It is possible to have an intermediate state where Node. prev points to the original tail, but tail.next has not yet pointed to Node. If another thread traverses the queue through the next pointer, the last node will be missed. But if the wait queue is traversed by tail.prev, nodes will not be missed
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
Copy the code
acquireQueued
- Each loop determines if it can try to acquire the lock (if the precursor p is head), and if so, try tryAcquire(ARG).
- If this cannot be attempted, or if the lock acquisition fails, the thread is blocked with parkAndCheckInterrupt and the thread interrupt status is checked
- If the thread is unpark/interrupt, it returns from park, then from parkAndCheckInterrupt(), and continues
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true; }}finally {
if (failed) cancelAcquire(node); // This method will not be executed}}private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
Copy the code
shouldParkAfterFailedAcquire
The state of the Node
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
Copy the code
-
CANCELLED indicates that the thread has CANCELLED its wait
-
SIGNAL indicates that the thread representing the node’s successor is blocked or about to block. When the current node becomes head and releases the lock, subsequent nodes are woken up according to SIGNAL. SIGNAL is a SIGNAL to wake up a successor node.
- When a node is created, its waitStatus is initialized to 0 by default
instructions
This function is executed on failure to acquire the lock:
- P == head is false, that is, the node of the current thread has a precursor that is not head
- Although p == head is true, the current thread failed to acquire the lock even though it was at the top of the queue
ParkAndCheckInterrupt is executed only if the function returns true
Effect: Skip the invalid precursor, find the valid node CANCELLED (valid means node is not CANCELLED), set the status of the valid precursor to SIGNAL, and return true to indicate that the block is ready immediately. Set a SIGNAL to the previous node, which acts as an alarm clock that wakes the current node when it releases the lock
CompareAndSetWaitStatus (pred, ws, Node.signal) returns false and enters the next loop. The if (ws == Node.signal) branch can only be entered the second time, so execute at least twice. An infinite loop guarantees that the precursor will eventually be successfully set to SIGNAL. (Consider that the current thread has been unable to acquire the lock)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) return true;
if (ws > 0) {
do {
CANCELLED, indicating that the precursor node has CANCELLED itself because it timed out or responded to an interrupt and needs to be skipped
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // Change a node's state to SIGNAL
}
return false;
}
Copy the code
parkAndCheckInterrupt
After calling locksupport. park(this), the current thread blocks until another thread unparks or interrupts the current thread. Thread.interrupted() returns true if the current Thread has been interrupted by another Thread while blocked. Note that Thread.interrupted() consumes the interrupted state, returning true for the first time but false for the second time.
If another Thread unparks the current Thread, calling thread.interrupted () returns false. If another Thread interrupts the current Thread, calling thread.interrupted () returns true.
Going back to the acquireQueued logic, parkAndCheckInterrupt returns true once the current thread has been interrupted, so if interrupted = true, the local variable must be true. (The interrupt state is permanent and used to recover user interrupts in outermost Acquire)
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
Pay attention to the point
Ignore the interrupt
The interrupt signal sent by the user is ignored (that is, the thread will not be removed from the synchronization queue if it fails to obtain the synchronization state) until the acquireQueued execution is complete. SelfInterrupt is used to restore the interrupt of the user
Why is the premise of tryAcquire(ARG) p == head?
The head is a dummy node, and the actual node is just after the head. The node predecessor is head (final node P = node.predecessor()), which means that the node has been the first actual node in the queue, and the first node in the row can naturally try to obtain the lock.
Recalling the whole call process, tryAcquire failed at the very beginning when it was called in Acquire. However, it is possible to call tryAcquire immediately in the first loop (it is possible because P == head needs to be established). Will this be a definitely failed tryAcquire? If thread 1 acquiesces the lock and the queue is empty, thread 2 will acquire the lock. If thread 2 acquireQueued, thread 1 will release the lock. If thread 2 acquireQueued, thread 1 will release the lock. If thread 2 must be next to head, then thread 2 should tryAcquire and then acquire successfully.
Who is the thread that executes the acquireQueued? Must be thread members of the Node parameter, although execution may be blocked and woken up.
Why do I put the node representing the current thread at the end of the queue just after executing the addWaiter method, and then find myself next to the head?
When you consider addWaiter, there are many nodes in the queue. This means that the nodes between head and nodes in the current method stack will themselves execute the acquireQueued, and they will execute successfully (p == HEAD && tryAcquire(ARG) successfully). Each successful execution is equivalent to moving the head member one node back from the queue, and when they are all executed, the node in the current stack is the successor of the head. After the last node of the “nodes between” acquireQueued succeeds (the thread representing the last node acquires the lock and becomes the head itself), the current method is still blocking and only when the “last node” releases the exclusive lock, Only then will the unparksucceeded (head) be executed and the current thread be awakened.
Will the finally block execute cancelAcquire(node)?
Although this function is said to be interrupt-free, this is only true for AQS users. If one thread blocks parkAndCheckInterrupt and another interrupts it, it will immediately wake up and continue the loop. If this function is interrupted by return interrupted (Failed = false), then cancelAcquire(node) will never be executed.
release(int arg)
In exclusive mode, threads release a top-level entry to a shared resource. It frees a specified amount of resources, and if it frees completely (i.e., state=0), it wakes up other threads in the waiting queue to acquire resources.
The lock release process makes no distinction between fair and unfair, interrupt or non-interrupt response, timeout or non-timeout. This is because these differences only exist in the way the lock is attempted, and since the lock is already obtained, there is no need for these differences.
details
- If you encounter
s == null
“, indicating that we have reached an intermediate state where the next pointer is not pointing properly. If you encounters.waitStatus > 0
“, indicating that the head successor has just been cancelled. In both cases, you need to go to the prev at the end of the line. - Notice the cyclic condition
t ! = null && t ! = node
He’ll start at the end of the line untilt
Is null ort
Already arrivednode
. Normally, it doesn’t happent ! = null
So, this cycle is sure to findnode
The first node after that is not canceled.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// If only one thread is using the lock from start to finish, then the queue is not initialized. Head must be null.
If there is only one dummy node in the queue, its state is zero and no unparksucceeded (h) will be executed
// If the state of head is SIGNAL, the head successors have set the alarm and will execute the unparksucceeded (h).
if(h ! =null&& h.waitStatus ! =0) unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
/* * Head can be found directly from next, but only prev is guaranteed to work. * If next is null, you must go to the prev at the end of the queue. * If next is cancelled, you need to go to prev at the end of the queue. * /
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0) s = t;
}
if(s ! =null) LockSupport.unpark(s.thread);
}
Copy the code
acquireInterruptibly(int arg)
After entering this method, the first tryAcquire attempt is made. The acquireInterruptibly function, however, checks for Thread.interrupted() and throws an exception.
The acquireInterruptibly method may or may not be fair, depending on the tryAcquire implementation (that is, how the ReentrantLock was originally constructed).
If an interrupt signal is detected, the thread first returns from locksupport.park (), throws InterruptedException, and executes the cancelAcquire method to remove the node represented by the thread from the wait queue. The successor node of unparkantecedent is chosen according to the circumstances
DoAcquireInterruptibly does not need to return a value because it simply throws an exception if the interrupted status is detected in the function.
The finally block of the doAcquireInterruptibly method may run to cancelAcquire(node)**, while the acquireQueued method cannot run to cancelAcquire(node). In the doAcquireInterruptibly method, if a thread blocks on parkAndCheckInterrupt and another thread interrupts the blocking thread, the blocking thread will wake up and throw an exception. CancelAcquire (node) cancelAcquire(node) cancelAcquire(node) cancelAcquire(node) cancelAcquire(node)
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
if(! tryAcquire(arg)) doAcquireInterruptibly(arg); }private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
private void cancelAcquire(Node node) {
if (node == null) return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0) node.prev = pred = pred.prev; // Skip CANCELLED nodes
// After executing the loop, pred points to a valid node precursor
Node predNext = pred.next;
// If another thread executes this step, another thread will skip the node.
// If another thread executes this step, the other thread will still treat the node as a valid node.
node.waitStatus = Node.CANCELLED;
// If node is the end of the queue, set pred to the end of the queue and null to the successor of pred
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
Node next = node.next;
if(next ! =null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC}}Copy the code
tryAcquireNanos(int arg, long nanosTimeout)
TryAcquireNanos this method corresponds to the acquire method that does not respond to interrupts. Again, after entering this method, the first tryAcquire attempt is made. However, in this tryAcquireNanos function, thread.interrupted () is first checked and an exception is thrown.
Note, however, that the tryAcquireNanos method can be either fair or unfair, depending on the tryAcquire implementation (that is, how the ReentrantLock was originally constructed).
The difference
Each loop checks to see if the deadline has been reached. When the remaining time is less than spinForTimeoutThreshold, locksupport.parknanos cannot be called because the time is too short to accurately control the blocking time, so it is better to keep cycling for the remaining time. Locksupport. parkNanos will wake up not only because of other people’s park, but also because of other people’s interruption, and most importantly, when the time is up, it will wake up itself. In either case, upon awakening, the interruption status is checked. Each loop is checked once.
If interrupted, the cancelAcquire method is also entered
final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L) return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted()) throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
acquireShared
The difference between shared and exclusive locks
- Exclusive locks are thread-exclusive locks. Only one thread can hold an exclusive lock at a time. AQS allocates this thread to the ‘exclusiveOwnerThread’ member.
- A shared lock is shared by threads. Multiple threads can have a shared lock at the same time, but AQS does not store members of multiple threads that acquired the shared lock.
- If a thread has just acquired the shared lock, there is a good chance that the thread waiting after it will also acquire the lock. But an exclusive lock doesn’t do that, because the lock is exclusive.
- Of course, if a thread has just released a lock, either exclusive or shared, it needs to wake up the thread waiting behind it
process
- The nodes created are different. The SHARED lock uses addWaiter(Node.shared), so a Node that wants to acquire the SHARED lock is created. For an EXCLUSIVE lock, use addWaiter(Node.exclusive).
- After obtaining the lock successfully, the cleaning operation is different. SetHeadAndPropagate (node, r) is used for the shared lock, because the thread after the shared lock is successfully acquired is also likely to succeed, so it needs to wake up the head successor under certain conditions. Exclusive locks use setHead(node).
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted) selfInterrupt();
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}/** The setHead function simply turns the node that just became the head into a dummy node. And the setHead propagate also calls the setHead function. But it may also call doReleaseShared under certain conditions: "If a thread has just acquired a shared lock, there is a good chance that the thread waiting after it will also be able to acquire the lock." * /
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null|| s.isShared()) doReleaseShared(); }}private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/** * Restores the interrupted state, although this version of the function does not respond to interrupts. * When acquireQueued returns true, it means that the interrupted state has been detected and consumed * so the interrupted state needs to be set again before exiting Acquire. * /
static void selfInterrupt(a) {
Thread.currentThread().interrupt();
}
Copy the code
releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// doReleaseShared may also be called upon success in obtaining the shared lock.
private void doReleaseShared(a) {
for(; ;) { Node h = head;// If the queue is never initialized (head is null), or head is tail, then the head changes directly.
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0)) continue;
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
continue; }}/* setHead set a new head to ensure that if the thread has acquired the lock and set a new head, the loop will exit if the head has not changed. It will cycle again, of course, in order to execute the unparksucceeded (h) again, that is, to wake the first waiting thread in the queue */
if (h == head) break; }}Copy the code