Let go of me, I can still learn
Share knowledge and make progress together!
What is AQS?
AQS (AbstractQueuedSynchronizer) core idea is: if the requested free sharing of resources, will thread is set to the current request resources effective worker threads, and set the Shared resource is locked. If the requested shared resource is occupied, then a mechanism is needed for thread blocking and waiting and lock allocation when woken up. This mechanism is implemented by CLH queue lock, which is to queue the thread that can not acquire the lock temporarily.
CLH(Craig,Landin,and Hagersten) queue is a virtual bidirectional queue (virtual bidirectional queue has no queue instance, only the association relationship between nodes). AQS is a Node that encapsulates each thread requesting shared resources into a CLH lock queue to realize lock allocation.
The CLH queue is shown below:
Queue synchronizer AbstractQueuedSynchronizer (hereinafter referred to as the synchronizer), is used to construct the lock or other synchronous component based framework, it USES a member variables of type int state synchronization state, Through the built-in FIFO queue to complete the queuing of resource acquisition threads.
The main use of synchronizer is inheritance. Subclasses manage synchronization state by inheriting the synchronizer and implementing its abstract methods. During the implementation of abstract methods, synchronization state must be changed. You need to use the three methods provided by the synchronizer getState(), setState(int newState) and compareAndSetState(int expect, int update) to operate, Because they guarantee that state changes are safe.
Recommended is defined as a custom subclass synchronous components of a static inner class, synchronizer itself does not implement any synchronization interfaces, it is only defines the number of state synchronization acquisition and release methods for use by the custom synchronous components, synchronizer can support to exclusive access to sync, can also support to Shared access to sync, This makes it easy to implement different types of synchronization components (ReentrantLock, ReentrantReadWriteLock, CountDownLatch, etc.).
Synchronizer is the key to realize lock. Synchronizers are aggregated in lock implementation and the semantics of lock are realized by synchronizer. The relationship between the two can be understood as follows: the lock is user-oriented, which defines the interface between the user and the lock while hiding the implementation details; Synchronizer is for the implementor of the lock, it simplifies the implementation of the lock, shielding the synchronization state management, thread queuing, waiting and wake up and other low-level operations. Locks and synchronizers do a good job of isolating areas of concern for consumers and implementers.
The design of the synchronizer is based on the template method pattern, which means that the consumer inherits the synchronizer and overwrites the specified methods, then combines the synchronizers in the implementation of the custom synchronization component and calls the template methods provided by the synchronizer, which will invoke the user-overwritten methods.
When overriding a method specified by the synchronizer, you need to use the following three methods provided by the synchronizer to access or modify the synchronization state:
getState()
: Gets the current synchronization statussetState(int newState)
: Sets the current synchronization statuscompareAndSetState(int expect,int update)
: Uses CAS to set the current state. This method ensures atomicity of state Settings
The methods that synchronizer can override are shown in the following table.
Method names | describe |
---|---|
protected boolean tryAcquire(int arg) |
Obtain the synchronization status exclusively. To achieve this method, you need to query the current status and determine whether the synchronization status meets expectations. Then set the CAS synchronization status |
protected boolean tryRelease(int arg) |
Exclusive release of synchronization state, waiting for synchronization state will have the opportunity to acquire synchronization state |
protected int tryAcquireShared(int arg) |
If the value is greater than or equal to 0, the synchronization status is obtained successfully. Otherwise, the synchronization fails |
protected boolean tryReleaseShared(int arg) |
Release synchronization state in shared mode |
protected boolean isHeldExclusively() |
Whether the current synchronizer is occupied by the thread in exclusive mode. This method usually indicates whether the current synchronizer is occupied by the thread |
When a custom synchronous component is implemented, the template methods provided by the synchronizer are invoked, as shown in the figure below.
The template methods provided by the synchronizer fall into three categories: exclusive access and release synchronization state, shared access and release synchronization state, and query the status of waiting threads in the synchronization queue. Custom synchronization components implement their synchronization semantics using the template methods provided by the synchronizer.
Implementation of Mutex exclusive lock
You need to understand how synchronizers work to better understand and send concurrent components in packages, so let’s take a closer look at how synchronizers work with an exclusive lock example.
As the name implies, an exclusive lock means that only one thread can acquire the lock at a time, while other threads that acquire the lock can only wait in the synchronization queue. Only when the thread that acquired the lock releases the lock, the subsequent threads can continue to acquire the lock.
Mutex code implementation is as follows:
public class Mutex implements Lock {
// Static inner class, custom synchronizer
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -4387327721959839431L;
// Whether it is in the occupied state @Override protected boolean isHeldExclusively(a) { return getState() == 1; } // Get the lock when the state is 0 @Override public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0.1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Release the lock and set the status to 0 @Override protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // Return a Condition. Each Condition contains a Condition queue Condition newCondition(a) { return new ConditionObject(); } } // Simply delegate the action to Sync private final Sync sync = new Sync(); @Override public void lock(a) { sync.acquire(1); } @Override public boolean tryLock(a) { return sync.tryAcquire(1); } @Override public void unlock(a) { sync.release(1); } public Condition newCondition(a) { return sync.newCondition(); } public boolean isLocked(a) { return sync.isHeldExclusively(); } public boolean hasQueuedThreads(a) { return sync.hasQueuedThreads(); } @Override public void lockInterruptibly(a) throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } } Copy the code
In the example above, the exclusive lock Mutex is a custom synchronization component that allows only one thread to hold the lock at a time. Mutex defines a static inner class that inherits the synchronizer and implements the exclusive acquisition and release of synchronization state. In the tryAcquire(int Acquires) method, if the CAS setting succeeds (the synchronization status is set to 1), then the synchronization status is obtained, whereas in the tryRelease(int Releases) method it simply resets the synchronization status to 0. When using Mutex, the user does not directly interact with the internal synchronizer implementation. Instead, the user calls the methods provided by Mutex. In Mutex implementation, take the lock() method as an example, just call the template method acquire(int args). If the current thread fails to call the method to get the synchronization state, it will be added to the synchronization queue to wait, which greatly lowers the threshold for implementing a reliable custom synchronization component.
AQS principle
Next, from the perspective of implementation, the core data structure and template methods of synchronizer such as synchronization queue, exclusive synchronization state acquisition and release, shared synchronization state acquisition and release, timeout synchronization state acquisition are analyzed.
Synchronous queue
Synchronizer rely on internal synchronous queue (two-way a FIFO queue) to complete synchronization state management, the current thread for synchronous state failure, synchronizer will the current thread and wait state information such as the structure become a Node Node and add it to the synchronous queue, blocks the current thread at the same time, when the sync release, Wakes up the thread in the head node to try again to get the synchronization state.
The Node Node in the synchronization queue is used to store thread references that fail to obtain synchronization status, wait status, and precursor and successor nodes. The attribute types, names and descriptions of nodes are shown below.
Node A Node can be in the following states:
// Due to timeout or interruption, the node will be set to the cancelled state. The cancelled node will not participate in the competition, and the cancelled state will remain unchanged
static final int CANCELLED = 1;
// The thread of the successor node is in the wait state, and if the thread of the current node releases the synchronization state or is cancelled, the successor node will be notified, so that the thread of the successor node can run
static final int SIGNAL = -1;
Condition. When another thread calls signal on Condition, the node will be transferred from the waiting queue to the synchronization queue and added to the synchronization state static final int CONDITION = -2; // Indicates that the next shared synchronous state acquisition will be propagated unconditionally static final int PROPAGATE = -3; Copy the code
Different nodes are also defined in Node:
// Subsequent nodes
volatile Node next;
// The first node is set when the node joins the synchronization queue.
volatile Node prev;
// Get the thread in sync state volatile Thread thread; // Wait for the successor node in the queue. If the current node is SHARED, the field is a SHARED constant, meaning that the node type (exclusive and SHARED) and its successors in the wait queue share a field Node nextWaiter; Copy the code
The node is the basis of the synchronization queue. The synchronizer has head node head and tail node. The thread that fails to obtain synchronization status will become the tail of the node joining the queue.
In the figure above, the synchronizer contains references to two node types, one pointing to the head node and the other to the tail node. If one thread succeeds in acquiring the synchronization state, other threads will not be able to acquire the synchronization state and will instead be constructed as nodes and added to the synchronization queue. This process must be thread safe, so the synchronizer provides a method to set tail nodes based on CAS: CompareAndSetTail (Node expect, Node Update), which needs to pass the tail Node and the current Node that the current thread “thinks” of. The current Node is not formally associated with the previous tail Node until it is set successfully.
The synchronization queue follows FIFO. The head node is the node that successfully obtains the synchronization state. When the thread of the head node releases the synchronization state, it will wake up the successor node, and the successor node will set itself as the head node when successfully obtains the synchronization state, as shown in the following figure.
In the diagram above, set the head node is accomplished by acquiring successful thread synchronization state, since there is only one thread can access to the synchronous state, so set the head node method does not need to use CAS to guarantee, it only need to turn the head node set to become the head node of the subsequent nodes and disconnect the head node of the next reference.
Exclusive synchronization state get and release
Acquire the synchronization state by calling acquire(int arg) of the synchronizer. This method is interrupt insensitive, that is, because the thread fails to acquire the synchronization state and enters the synchronization queue, the thread will not be removed from the synchronization queue during the subsequent interrupt operation, acquire method code is as follows:
public final void acquire(int arg) {
// Try to get the lock
if(! tryAcquire(arg) && // If no, enter the queue and return whether to interrupt
// acquireQueued returns true to indicate an interrupt
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // If an interrupt is returned, the current thread's interrupt() method is called selfInterrupt(); } Copy the code
The above code mainly completes the synchronization state acquisition, node construction, joining the synchronization queue and spin waiting in the synchronization queue. The main logic is as follows: The tryAcquire(int arg) method implemented by the custom synchronizer (implemented by subclasses) is first called. This method ensures thread-safe acquisition of the synchronization state. If the synchronization state fails, a synchronization Node (node. EXCLUSIVE, AcquireQueued (Node Node, int arg) only one thread at a time can successfully obtain the synchronization state) and add the Node to the end of the synchronization queue using the addWaiter(Node Node) method. Causes the node to spin its synchronization state. If not, locksupport. park(this) is called to block the thread in the node, and the blocked thread is woken up mainly by the withdrawal of the precursor node or the interruption of the blocking thread.
The following is an analysis of the related work, the first is the construction of nodes and join the synchronization queue.
private Node addWaiter(Node mode) {
// Encapsulate the current thread as Node with mode as an exclusive lock
Node node = new Node(Thread.currentThread(), mode);
// tail is the tail of the AQS queue, null at first, so enq(node) method is used
Node pred = tail;
if(pred ! =null) { // Set the front-end node of the current node to the tail node node.prev = pred; // CAS sets the tail if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // For the first time, tail=null adds node to the synchronization queue enq(node); return node; } private Node enq(final Node node) { // lockFree: loop +CAS for (;;) { Node t = tail; // Tail =null if the queue is added for the first time if (t == null) { // CAS sets the head node if (compareAndSetHead(new Node())) tail = head; } Otherwise the add logic is similar to that in addWaiter // Why not null? T ==null; t==null; t==null; // If (t==null) does not pass, if there is no else logic, then the second thread cannot execute, // Therefore, a logic similar to addWaiter is needed to ensure that it works even with multiple threads. else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } Copy the code
The code above ensures that nodes can be added safely by threads using the compareAndSetTail(Node expect, Node Update) method.
In the ENQ (Final Node Node) method, the synchronizer ensures the correct addition of nodes through an “infinite loop” in which the current thread can only return from the method after the Node is set as a tail by CAS, otherwise the current thread keeps trying to set. As you can see, the ENQ (Final Node Node) method serializes requests for concurrent Node addition through CAS.
After a node enters the synchronization queue, it enters a spin process, and each node (or thread) is introspectively observing. When the condition is met and the synchronization state is obtained, it can exit the spin process, otherwise it remains in the spin process (and blocks the node’s thread). The synchronizer’s acquireQueued method is shown below.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // Its precursor is the header, and tryAcquire is called again successfully to acquire the lock if (p == head && tryAcquire(arg)) { // Set yourself as the header setHead(node); p.next = null; // help GC failed = false; // Successfully acquired the lock return interrupted; } // Do not get the lock / / shouldParkAfterFailedAcquire methods: return whether need to block the current thread // parkAndCheckInterrupt: blocks the current thread and returns whether it was interrupted when the thread wakes up again if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // Change the interrupt flag bit interrupted = true; } } finally { if (failed) // If the lock fails to be acquired, change waitStatus of the node corresponding to this thread to CANCEL cancelAcquire(node); } } / ** If the lock fails to be acquired, check and update the node waitStatus.* Return true if the thread needs to block.* / private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // The waitStatus of the precursor node is SIGNAL if (ws == Node.SIGNAL) / ** A node in the SIGNAL state wakes up its successors after releasing the lock.* Therefore, this thread can safely block (it is woken up when the precursor node releases the lock).* / return true; // The thread corresponding to the precursor node was CANCELLED CANCELLED = 1 if (ws > 0) { // The cancelled node needs to be removed from the queue until a non-cancelled node is found do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // In addition to the above cases, CAS is used to set the status of the precursor node to SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } / / return true when shouldParkAfterFailedAcquire method, call the parkAndCheckInterrupt method blocks the current thread private final boolean parkAndCheckInterrupt(a) { // Block the current thread LockSupport.park(this); // Determine whether the current thread is interrupted, and return true if it is interrupted // Since thread.interrupted () clears the interrupt flag, change to true later return Thread.interrupted(); } Copy the code
In the acquireQueued(Final Node Node, int arg) method, the current thread attempts to obtain the synchronization state in an “infinite loop”, and only the drive Node is the head Node can attempt to obtain the synchronization state for two reasons:
- The head node is the node that has successfully obtained the synchronization state. After the thread of the head node releases the synchronization state, it will wake up its successor nodes. After waking up, the thread of the successor node needs to check whether its precursor node is the head node.
- Maintain the FIFO principle for synchronization queues
In this method, the behavior of node spin obtaining synchronization state is shown in the figure below.
In the figure above, a thread that is not a head node returns from the wait state because its precursor has been queued or interrupted, and then checks if its precursor is a head node and tries to get the synchronization state if it is. Can see nodes and between nodes in the process of cycle check basic don’t communicate with each other, but simply to judge whether their precursors for the head node, which makes the node release rules in line with the FIFO, and also facilitate handling of early notification (early notification refers to the thread is not head precursor node aroused due to interruptions).
Process for obtaining exclusive synchronization status:
For each thread that can’t get the lock, it will encapsulate itself as a Node Node and join the end of the queue, and then determine whether the precursor Node of the current Node is the head Node. If so, it will try to obtain the lock. If the attempt is successful, it will set itself as the head Node. If it fails, the status of the precursor node is checked. If the status is SIGNAL(-1), the current node is blocked (the precursor node can wake up and reduce spin overhead). For CANCELLED(1), change to SIGNAL(-1). When a new node is added, spin to determine whether the precursor node of the current node is the head node. If it is not, check whether the status is SIGNAL(-1). If it is CANCELLED(1), the status will be blocked.
So the whole process is sometimes spinning and sometimes blocking until the lock is acquired or cancelled.
Related diagrams can refer to this article at https://www.jianshu.com/p/b6efbdbdc6fa.
After the current thread has acquired the synchronization state and executed the logic, it needs to release the synchronization state so that subsequent nodes can continue to acquire the synchronization state. The synchronization state can be released by calling the release(int arg) method of the synchronizer, which wakes up subsequent nodes after releasing the synchronization state (thus causing them to try again to get the synchronization state). The release method for the synchronizer is shown below.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; // If the state of the current node is less than 0, use CAS to set it to 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // Get the successor node of the current node Node s = node.next; // If the successor node is empty or the state of the successor node is > 0 (cancel state) if (s == null || s.waitStatus > 0) { s = null; // Look forward from the tail node to the available node whose state is not canceled for(Node t = tail; t ! =null&& t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if(s ! =null) // Wake up the successor node LockSupport.unpark(s.thread); } Copy the code
The method, when executed, wakes up the successor Node threads of the head Node, and the unparksucceeded (Node Node) method uses LockSupport to wake up the threads in the waiting state.
conclusion
When obtaining the synchronization state, the synchronizer maintains a synchronization queue. The thread that fails to obtain the synchronization state will be added to the queue and spin in the queue. The condition for moving out of the queue (or stopping the spin) is that the precursor node is the head node and synchronization status has been successfully acquired. When releasing the synchronization state, the synchronizer calls the tryRelease(int arg) method to release the synchronization state and then wakes up the successor nodes of the head node.
Shared synchronization state acquisition and release
The main difference between shared and exclusive acquisition is whether more than one thread can simultaneously acquire the synchronization state. Take file reading and writing as an example. If a program is reading a file, all write operations on the file are blocked at that moment, but the read operations can be performed simultaneously. Write operations require exclusive access to resources, while read operations can be shared access. The two different access modes can access files or resources at the same time, as shown in the following figure.
In the figure above, on the left, shared access is allowed while exclusive access is blocked. If the right half is exclusive access to the resource, all other access is blocked at the same time.
The synchronization state can be shared by calling the synchronizer’s acquireShared(int arg) method, as shown below.
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); Propagate if propagate >0, it means that I can obtain the remaining shared locks after I successfully obtain the shared lock // If =0, no shared lock can be obtained after I successfully obtain the shared lock / *If propagate > 0, it means that there are still remaining shared locks that can be obtained, then it means that subsequent nodes need to be woken up.If propagate = 0, it means that there are no remaining shared locks to obtain, and logically there is no need to wake up subsequent locks.However, if h.waitStatus < 0, this indicates that the head's waitStatus is < 0, but waitStatus was set to 0 before that.There's only one way to make waitStatus < 0Due to the compareAndSetWaitStatus(h, 0, PROPAGATE) in doReleaseShared,Another thread called doReleaseShared, most likely because someone else released the shared lock in the intermediate state* / if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // Wake up the current node if its successor is of the shared type or the current node has no successor Unless it is explicitly stated that it does not need to be woken up (the subsequent wait node is of exclusive type), it must be woken up if (s == null || s.isShared()) // Wake up the successor node of the shared lock node doReleaseShared(); } } Copy the code
In the acquireShared(int arg) method, the synchronizer calls the tryAcquireShared(int arg) method to try to get the status of the synchronization, tryAcquireShared(int arg) returns an int, If the return value is greater than or equal to 0, the synchronization status can be obtained. Therefore, in the process of shared spin acquisition, the condition for successfully obtaining the synchronization state and exit spin is that the return value of tryAcquireShared(int arg) is greater than or equal to 0.
In doAcquireShared(int arg), if the precursor of the current node is the head node, try to obtain the synchronization state. If the return value is greater than or equal to 0, it indicates that the synchronization state is obtained successfully and the system exits the spin process.
As with exclusive, shared fetching requires the release of synchronous state, which can be done by calling releaseShared(int arg), which is coded as follows.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false; } private void doReleaseShared(a) { for (;;) { Node h = head; // If the header is not null && the header is not equal to the tail, there is a valid node if(h ! =null&& h ! = tail) { int ws = h.waitStatus; // If the status of the header is SIGNAL(-1), there is a successor node that needs to be woken up if (ws == Node.SIGNAL) { // Update the state of the header to 0 (the initial state), at which point the header is useless if(! compareAndSetWaitStatus(h, Node.SIGNAL,0)) continue; // loop to recheck cases // Wake up the successor node unparkSuccessor(h); } // If the header is the initial state 0, set to PROPAGATE(-3) to ensure that subsequent nodes are notified when the synchronization state is released else if (ws == 0 && ! compareAndSetWaitStatus(h,0, Node.PROPAGATE)) continue; // loop on failed CAS } // If the header does not change, the setting is complete and the loop exits // If the header changes, for example, another thread has acquired the lock, it must retry in order for its wake up action to be transmitted if (h == head) // loop if head changed break; } } Copy the code
This method, after releasing the synchronization state, wakes up subsequent nodes in the wait state. For concurrent components (such as Semaphore) that can be accessed simultaneously by multiple threads, the main difference is that the tryReleaseShared(Int ARG) method must ensure that synchronized state (or resource count) threads are released safely, usually through loops and CAS. This is because the operation to release synchronized state can come from multiple threads at the same time.
Note: The setHeadAndPropagate method means that the thread in the waiting queue successfully obtains the shared lock, at this time, it needs to wake up the shared node behind it, but when the releaseShared method is used to release a shared lock, the thread waiting for the exclusive lock and the shared lock can be woken up to try to obtain the lock.
conclusion
Compared with an exclusive lock, the main characteristic of a Shared lock is that when a node in the waiting queue share success for later to lock it is access to a Shared lock (), since it is Shared, it must, in turn, wake up behind all can share the current lock resources of nodes with it, there is no doubt that the node must also be waiting for a Shared lock (this is a major premise, If you are waiting for an exclusive lock, then a shared node has already acquired the lock, which must not be acquired. When a shared lock is released, consider the read-write lock as an example. When a read lock is released, both the read lock and the write lock can compete for resources.
conclusion
The difference between an exclusive lock and a shared lock at the source level is:
Exclusive locks wake up successor nodes of the CLH wait queue only after the current lock explicitly calls release.
Shared locks have a setHeadAndPropagate method in which the next shared node of the wait queue can be woken up without explicitly calling the releaseShared method.