preface
AbstractQueuedSynchronizer abstract queue synchronizer, referred to as “AQS. JUC is a very important basic component under JUC package, the concurrent lock ReentrantLock CountDownLatch and so on are based on AQS implementation. Therefore, to further study the underlying principle of lock, it is necessary to understand the principle of AQS first.
[liuzhirichard] Record technology, development and source code notes in work and study. From time to time, share what you’ve seen and heard in your life. Welcome to guide!
introduce
First look at the AQS class diagram structure, as well as the source notes, there is a certain understanding of the source code to start with, step by step to study its underlying principle.
“Source code comment
Provides a framework for implementing blocking locks and associated synchronizers relying on first-in, first-out (FIFO) waiting queues (semaphores, events, etc.). In this class, an atomic variable that is useful for most AQS based synchronizers is designed to represent state. Subclasses must define protected methods to modify the state, and define the state value in the object to mean either acquired or released. With this in mind, other methods in this class can implement all queuing and blocking mechanisms. Subclasses can keep other state fields, but can only update state atomically using the methods getState, setState, and compareAndSetState.
A subclass should be defined as a non-public inner helper class that implements the synchronization performance of its enclosing class. Class AbstractQueuedSynchronizer don’t implement any synchronous interface. Instead, it defines methods such as acquireInterruptibly that can be invoked with specific locks and associated synchronizers to properly implement their public methods.
This class supports exclusive mode and shared mode. In exclusive mode, other threads cannot succeed. In shared mode, they can (but do not necessarily) succeed. This kind of lack of “understanding” differs in the mechanical sense that when the shared mode succeeds, the next waiting thread (if it exists) must also determine whether it can get it. Threads waiting in different modes share the same FIFO queue. Typically, implementation subclasses support only one of these modes, but it is possible to use both modes, such as ReadWriteLock. Sharing schemas only does not require defining subclasses of methods that support unused schemas.
. This class defines the nested class AbstractQueuedSynchronizer ConditionObject, can be used as a Condition, by a subclass implementation and isHeldExclusively method is used to illustrate whether the current thread in exclusive manner, The release(), getState() acquire() methods are used to manipulate state atomic variables.
This class provides methods to examine and monitor internal queues, as well as conditional objects for similar methods. To be used as needed for their synchronization mechanism.
To use this class as a synchronization basis, you need to redefine the following methods, such as using, by checking and/or modifying the getState, setState, or compareAndSetState methods:
tryAcquire tryRelease tryAcquireShared tryReleaseShared isHeldExclusively
“
You can get a general impression from the above notes:
- Internally, it relies on a first-in, first-out (FIFO) wait queue.
- The presence of state indicates the status information. State values can only be updated atomically using the getState, setState, and compareAndSetState methods.
- Exclusive mode and shared mode are supported, but specific subclasses are required to implement which mode is supported.
- Nested AbstractQueuedSynchronizer. ConditionObject can be used as a Condition by a subclass implementation.
- Subclasses need to redefine the tryAcquire, tryRelease, tryAcquireShared, tryReleaseShared, isHeldExclusively methods.
Queue Node Node
Node contains the following elements:
The element | meaning |
---|---|
prev | Previous node |
next | Next node |
thread | Hold the thread |
waitStatus | Node status |
nextWaiter | The next node in CONDITION |
The wait queue is as follows:
Here are the Node attributes of the waiting queue Node:
static final class Node {
// The flag that the node is waiting for in shared mode
static final Node SHARED = new Node();
// A flag indicating that the node is waiting in exclusive mode
static final Node EXCLUSIVE = null;
// Indicates that the thread has been canceled
static final int CANCELLED = 1;
// indicates that subsequent threads need to wake up
static final int SIGNAL = -1;
// indicates that the thread is waiting for a condition
static final int CONDITION = -2;
// Indicates that the next acquireShared should be propagated unconditionally
static final int PROPAGATE = -3;
/** * status field, use only the following values * signal-1: when the current node is released or cancelled, its subsequent nodes must be unpark. * CANCELLED 1: The object was CANCELLED due to timeout or interrupt. The node never leaves this state. In particular, threads with cancellation nodes will never block again. * CONDITION -2: This object is currently in the conditional queue. But it will not be used as a synchronization queue node until the transition, when the state will be set to 0. PROPAGATE -3: releaseShared should be propagated to other nodes. * 0: None of the * values are represented as numbers to simplify use, and most of the time you can check the symbol (greater than 0) to simplify use of */
volatile int waitStatus;
// Last node
volatile Node prev;
// Next node
volatile Node next;
// Nodes hold threads
volatile Thread thread;
// Link to the next wait condition node, or special value share
Node nextWaiter;
// Whether the node is in the shared state If yes, return true
final boolean isShared(a) {
return nextWaiter == SHARED;
}
// Returns the previous node. The previous node cannot be empty
final Node predecessor(a) throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread; }}Copy the code
Focus on waitStatus in the Node Node
- The default value is 0.
- WaitStatus > 0 (CANCELLED 1) Indicates that the node times out or is interrupted and needs to be removed from the queue.
- WaitStatus = -1 SIGNAL If the status of the previous node of the current thread is SIGNAL, the current thread should block (unpark).
- WaitStatus = -2 condition-2: the node is currently in the conditional queue.
- WaitStatus = -3 PROPAGATE -3: releaseShared should be propagated to other nodes for use in shared lock mode.
After understanding the structure of Node, then understand the structure of AQS, and start with common methods, and gradually understand the specific implementation logic.
AbstractQueuedSynchronizer
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// Wait for the queue head to delay initialization. Except for initialization, it is only modified through the method setHead. Note: If the head exists, its waitStatus is guaranteed not to be CANCELLED
private transient volatile Node head;
// Wait for the end of the queue, delay initialization. Add a new node only after modification through method ENQ.
private transient volatile Node tail;
// Synchronization status
private volatile int state;
// Get the status
protected final int getState(a) {
return state;
}
// Set the status value
protected final void setState(int newState) {
state = newState;
}
// Atom updates status values
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }}Copy the code
The main parameters in AQS are:
parameter | meaning |
---|---|
head | Wait queue head |
tail | End of queue |
state | sync |
Through notes to understand, in AQS is mainly divided into two modes of operation, respectively: exclusive mode, sharing mode, the following are respectively from two different perspectives to analyze the source code.
operation | meaning |
---|---|
acquire | Get in exclusive mode, ignoring interrupts. TryAcquire is implemented at least once by calling, returning on success. Otherwise, the thread queues and may repeatedly seize and unlock, calling tryAcquire until it succeeds. This method can be used to implement the method lock. Lock. |
release | Release in exclusive mode. By dredging one or more threads, return true if tryRelease is implemented. This method can be used to implement the method lock. unlock. |
acquireShared | Gets in shared mode, interrupts are ignored. TryAcquireShared is implemented with at least one first call, which returns on success. Otherwise, the thread is queued and may repeatedly seal and unseal, calling tryAcquireShared until it succeeds. |
releaseShared | Release in shared mode. By dredging one or more threads, return true if tryReleaseShared is implemented. |
Both shared and exclusive modes use the addWaiter method to create a queued node for the current thread and mode.
Exclusive mode
Acquire exclusive resources
public final void acquire(int arg) {
// tryAcquire attempts to obtain state. If tryAcquire fails, it will join the queue
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
AcquireQueued (addWaiter(Node.exclusive), ARG) is invoked when state is attempted in EXCLUSIVE mode and acquireQueued(addWaiter(Node.exclusive), arG) is invoked when state fails.
- TryAcquire (ARG), trying to acquire state is implemented by subclasses themselves. Different subclasses have different logic, which will be explained in subclass code.
- AcquireQueued (addWaiter(Node.exclusive), ARG) if state fails to be obtained, this code can be broken into two pieces: AddWaiter (Node.EXCLUSIVE), acquireQueued(Node, ARG).
- AddWaiter (Node.exclusive) returns the newly created Node.
- AcquireQueued (node, ARG) thread fails to acquire the lock and uses addWaiter(Node.exclusive) in the wait queue. Until successful or interrupted.
There are three steps to obtain resources:
- Attempt to obtain resources
- In the queue
- Out of the queue
Try to acquire a resource tryAcquire(ARG), which is implemented by subclasses, then proceed to analyze the inbound and outbound queues respectively.
Queue: addWaiter(Node.exclusive)
Use the addWaiter(Node.exclusive) method to insert a Node into the queue as follows:
- Create nodes based on the incoming schema
- Check whether the tail node exists
enq(node)
Method to initialize a nodetry
Insert the tail. try
The CAS insert is used when inserting the tail to prevent concurrency and is called if the insert failsenq(node)
Spin until insertion.
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Locate the node at the end of the queue
Node pred = tail;
if(pred ! =null) {
// The last node of the new node points to the last node
node.prev = pred;
// Use CAS to set tail node. If tail equals pred, update node
if (compareAndSetTail(pred, node)) {
// If the update succeeds, the next node of pred points to node
pred.next = node;
returnnode; }}// Tail node not initialized, or race failed, spin
enq(node);
return node;
}
/** ** expect expects update */; /** expect expects update */
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// If the tail node is empty, initialize the head node
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// Do not assign to an empty loop
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
After looking at the code and comments, I’m sure it’s still a little fuzzy, so I’ll illustrate it step by step with diagrams.
Because there are two cases depending on whether the initial tail node is empty or not, two graphs are used here:
- The first picture shows the node is added for the first time, at which time head will delay initialization.
- The second picture already has a queue for inserting nodes;
- If you look at the code, the enq method returns
The last node before
; - The addWaiter method returns
The newly created node that is currently inserted
.
After a Node is added to the queue, the current Node is returned, and the next step is to call the acquireQueued(addWaiter(Node.exclusive), arg) method continuously to obtain the resource.
AcquireQueued (addWaiter(node.exclusive), arg)
The method attempts to retrieve the resource through a loop until it succeeds. The code is as follows:
final boolean acquireQueued(final Node node, int arg) {
// Whether to get resources
boolean failed = true;
try {
// Interrupt status
boolean interrupted = false;
// Infinite loop
for (;;) {
// The node before the current node
final Node p = node.predecessor();
// Next is the first real data node (head is a virtual node)
// Then try to get the resource
if (p == head && tryAcquire(arg)) {
// After success, the head pointer points to the current node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// p is not the head node, or the head node does not obtain the resource (in unfair cases, other nodes preempted)
// Determine whether node is to be blocked,
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
- Continuously get whether the last node of this node is head, because head is a virtual node. If the last node of the current node is head, then the current node is
The first data node
; - The first data node continuously obtains resources. If the resource is successfully obtained, head points to the current node.
- The current node is not a head node, or
tryAcquire(arg)
Failure (failure may be an unfair lock). In this case, the state of the previous node needs to be determinedWhether the current node is to be blocked
(Whether the state of the previous node is SIGNAL).
/** * If the current thread should block based on the state of the previous node * signal-1: when the current node is released or cancelled, its subsequent nodes must be unpark. * CANCELLED 1: The object was CANCELLED due to timeout or interrupt. The node never leaves this state. In particular, threads with cancellation nodes will never block again. * CONDITION -2: This object is currently in the conditional queue. But it will not be used as a synchronization queue node until the transition, when the state will be set to 0. PROPAGATE -3: releaseShared should be propagated to other nodes. * 0: neither * */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// The wait state of the previous node
int ws = pred.waitStatus;
// The previous node requires unpark subsequent nodes
if (ws == Node.SIGNAL)
return true;
// The current node is in the cancelled state
if (ws > 0) {
do {
// Remove the cancelled node from the queue
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// Set the previous node to SIGNAL state
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
In shouldParkAfterFailedAcquire method, determines the former state of a node, and cancel the invalid nodes in front of the queue in the current node.
Queue acquireQueued (finally) cancelAcquire(node) cancelAcquire(node); , which is the red square at the bottom of the flowchart above.
cancelAcquire(Node node)
final boolean acquireQueued(final Node node, int arg) {
// Whether to get resources
boolean failed = true;
try {
/ / to omit
// Finally sets the current node to cancel
} finally {
if(failed) cancelAcquire(node); }}private void cancelAcquire(Node node) {
// The node does not have a direct return
if (node == null)
return;
// Unthread the node
node.thread = null;
// Skip the cancelled node and get the valid node before the current node
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Get the next node of the valid node before the current node
Node predNext = pred.next;
// The current node is set to cancel
node.waitStatus = Node.CANCELLED;
// If the current node is a tail node, set the last valid node to the tail node and set predNext to null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
/ / Mr Pred not head node (the node is not a valid node in the head) && (Mr Pred state is SIGNAL | | Mr Pred of state is set to SIGNAL success) && Mr Pred binding thread is not empty
if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
// The successor of the current node
Node next = node.next;
// The successor node is not empty and the state is valid sets the successor node of pred to the successor node of the current node
if(next ! =null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// The last valid node of node is head, or otherwise wakes up the next valid node of the current node
unparkSuccessor(node);
}
node.next = node; // help GC}}private void unparkSuccessor(Node node) {
// Determine the current node status
int ws = node.waitStatus;
if (ws < 0)
// Update the node status to 0
compareAndSetWaitStatus(node, ws, 0);
// The next node, usually the next node, should be the one to wake up, i.e. issue the certificate.
Node s = node.next;
// greater than 0 CANCELLED: Thread CANCELLED
// However, it is possible that the successor node is empty or cancelled.
if (s == null || s.waitStatus > 0) {
s = null;
// Start from the last node until you reach the node where t.waitStatus <= 0
// It does not stop after it is located, but continues to execute, equivalent to finding the original node that needs to be woken up
// t.waitStatus <= 0: SIGNAL (-1 needs to be released by subsequent threads)
// CONDITION (-2 thread is waiting for CONDITION)
PROPAGATE (PROPAGATE -3 releaseShared should PROPAGATE to other nodes)
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
// Unpark the node to wake up
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
Process analysis:
- Find the previous non-invalid node pred of the current node;
- If the current node is a tail node, set the last valid node to the tail node and set predNext to null;
- Mr Pred not head node && (Mr Pred state is SIGNAL | | Mr Pred of state is set to SIGNAL success) && Mr Pred binding thread is not null;
- Other cases.
The following are the drawings:
Q: As you can see from the graph, only the next pointer is operated, but not the prev pointer. Why?
A: In the queue: acquireQueued (addWaiter (Node. EXCLUSIVE), arg) method, before shouldParkAfterFailedAcquire method will determine whether a Node state, at the same time cancel invalid nodes in front of the queue in the current Node. The previous invalid node is removed, again to prevent pointing to a node that has been removed. The stability of the prev is ensured, too, which helps to go through the list from tail, the one in the unparksucceeded (node); You can also see the list in the back to front table.
Q: Why the unparkantecedent (Node Node)?
A:
When addWaiter(Node.exclusive) inserts a new Node, it is a tail insert. Look at the red box, and it may not point to next yet.
Q: node.next = node; If the head does not point to the latest node, the linked list will be broken. A: acquireQueued method description, there is A loop that attempts to obtain the resource and sets it to head after success. And also in shouldParkAfterFailedAcquire invalid node before removing the current node.
Release the exclusive resource release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
Release in exclusive mode. By releasing one or more threads, return true if tryRelease is implemented. This method can be used to implement the method lock. unlock.
- The tryRelease(ARG) operation frees the resource, again implemented by subclasses, as we’ll explain later. Returning true indicates that the resource is no longer held by any thread and other nodes can try to acquire it.
- Release successful, and head! = null && h.waitStatus ! = 0, will go on to execute the unparksucceeded (h);
- If the tryRelease(ARG) operation succeeded in releasing the resource, the unparkprecursor (h) will return true regardless of whether the subsequent attempt succeeded or not.
Sharing model
AcquireShared Gets the shared resource
public final void acquireShared(int arg) {
// If the value is less than 0, the resource fails to be obtained
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// Add to node
final Node node = addWaiter(Node.SHARED);
// Decide whether to cancel based on the availability of resources
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// Returns the previous node
final Node p = node.predecessor();
if (p == head) {
// Try again to get the shared resource
int r = tryAcquireShared(arg);
// Indicates success
if (r >= 0) {
// Set the current node as the head node and try to wake up subsequent nodes
setHeadAndPropagate(node, r);
// Releasing the header is collected by GC
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
- TryAcquireShared (arg), try to get the resource, this is implemented by subclass;
- There are three types of return values:
- Less than 0: failure.
- If the value is 0, resources are successfully obtained in shared mode, but subsequent nodes cannot be successfully obtained in shared mode.
- If the value is greater than 0, resources are successfully obtained in shared mode. Subsequent nodes may also succeed in obtaining resources in shared mode. In this case, subsequent waiting threads must check availability.
- It is used after a failure
doAcquireShared(arg);
Continuous access to resources; final Node node = addWaiter(Node.SHARED);
Nodes are also created;- The loop continues to determine if the previous node is head and tries to get the resource;
- Resources are used after being obtained in shared mode
setHeadAndPropagate(node, r);
Sets the head node and wakes up subsequent nodes.
Sets the head node and propagates wake up for subsequent nodes
// node is the current node
Propagate is the return value of the previous step tryAcquireShared >=0
// If the value is greater than 0, resources are successfully obtained in shared mode. Subsequent nodes may also successfully obtain resources in shared mode. In this case, subsequent waiting threads must check availability.
private void setHeadAndPropagate(Node node, int propagate) {
// Record the current head node
Node h = head; // Record old head for check below
// Set the incoming node as the head node
setHead(node);
// Wake up subsequent nodes
// propagate > 0 has subsequent resources
// h == null; // h == null; // h == null
// (h = head) == null The current head node
// waitStatus < 0 subsequent nodes need to be woken up
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// If subsequent nodes are shared, they need to be woken up
if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code
DoReleaseShared () releases shared resources
private void doReleaseShared(a) {
/ / loop
for (;;) {
// Start from scratch
Node h = head;
// Check whether the queue is empty, which is just initialized
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
// SIGNAL (-1 needs to be released by subsequent threads)
if (ws == Node.SIGNAL) {
// Update the wait status to 0
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
// Wake up subsequent nodes while setting the current node to cancel
unparkSuccessor(h);
}
// If the state is 0 then the state is updated to PROPAGATE
PROPAGATE (PROPAGATE -3 releaseShared should PROPAGATE to other nodes)
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// Check whether the head node has changed. The change is due to contention. Another thread has acquired the lock and will continue the loop
// End without change
if (h == head) // loop if head changed
break; }}Copy the code
- If h! = null && h ! = tail Indicates that the queue is not empty or just initialized.
- If the node status is SIGNAL (-1), subsequent threads need to be released.
- It will change the current node state, wake up subsequent nodes on success, continue the cycle on failure;
- If the node status is 0, it is updated to PROPAGATE, which will PROPAGATE the state.
Release the shared resource releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// Release shared resources
doReleaseShared();
return true;
}
return false;
}
Copy the code
Release in shared mode. By releasing one or more threads, return true if tryReleaseShared is implemented.
conclusion
Q: What exactly is AQS? A: AQS internally provides A first in, first out (FIFO) bidirectional wait queue, internally implemented by Node, and provides A common method for entering and exiting queues in exclusive mode and shared mode. The definition of state information is implemented by subclasses. TryAcquire, tryRelease, tryAcquireShared, tryReleaseShared, etc. are all defined and implemented by subclasses. AQS provides related operations after subclasses obtain resources, including Node access queue, spin access resources and so on.
Q: What happens if AQS fails to obtain resources? A: If A thread fails to acquire A resource, it will be put into A waiting queue. In the queue, it will continuously try to acquire the resource (spin), indicating that the thread only enters the waiting state and can obtain the resource again later.
Q: What is the data structure of AQS wait queues? A: FIRST-in, first-out (FIFO) bidirectional wait queues for CLH variants. CLH lock is a spin lock. Can ensure no hunger. Provide first come, first served fairness. Splock is a scalable, high performance, fair splock based on linked lists. The application thread spins only on local variables. It continuously polls the state of the precursor and terminates the spin if it finds that the precursor has released the lock.
Q: How do nodes in AQS wait queue obtain and release resources? A: You can look at the narrating process in exclusive mode and comb through the code.
This paper introduces the basic logic of AQS from exclusive mode and shared mode respectively, and understands the basic ideas through source code and drawing. But there is no introduction to the business logic that needs to be subclassed. This will be covered later when subclasses such as ReentrantLock and CountDownLatch are introduced.