preface
AQS (AbstractQueuedSynchronizer) is used to construct a lock and synchronizer (synchronous, refers to the communication between threads, collaboration) framework, Various locks in Lock packages (such as the common ReentrantLock, ReadWriteLock), various synchronizers in concurrent packages (such as CountDownLatch, Semaphore, CyclicBarrier) are based on AQS to build, so it is very important to understand the implementation principle of AQS, AQS is also a common test point to distinguish candidates in the interview, we must master, this article will use step-by-step introduction of AQS, I believe you must have a harvest after reading. The table of contents is as follows
- Locking principle – semaphore vs tube
- AQS implementation principle
- AQS source code analysis
- How to customize a mutex using AQS
Locking principle – semaphore vs tube
In the field of concurrent programming, there are two core problems: mutual exclusion, which means that only one thread is allowed to access a shared resource at a time, and synchronization, which means how threads communicate and collaborate. These two problems can be solved by semaphores and pipe procedures.
A semaphore
Semaphore is a common communication mode between processes provided by the operating system. It is mainly used to coordinate the access of concurrent programs to shared resources. The operating system can ensure the atomicity of Semaphore operations. How does that work?
- The semaphore consists of a shared integer variable S and two atomic operations PV. S can only be changed by P and V operations
- P operation: request resource, which means that S is reduced by 1. If S < 0, it means that there is no resource, and the thread enters the wait queue (synchronous queue) to wait
- If S is less than or equal to 0, it indicates that there are threads in the waiting queue. In this case, the thread needs to wake up.
The schematic diagram is as follows
The introduction of semaphore mechanism solves the problem of process synchronization and mutual exclusion, but a large number of semaphore synchronization operations scattered in each process is not easy to manage, and may lead to system deadlock. For example, if P and V are reversed in the producer-consumer problem, deadlock may occur (see reference link at the end of this article). In addition, the more conditions there are, the more semaphores need to be processed more carefully, otherwise it is easy to cause deadlock phenomenon.
Based on the hidden danger of semaphore programming, a more developer-friendly concurrent programming model-pipe program is proposed
Tube side
Dijkstra proposed in 1971 that the simultaneous operation of all processes on a critical resource should be centralized to form a so-called secretarial process. Any process that wants to access the critical resource needs to report to the secretary first, and the secretary can realize the mutually exclusive use of the same critical resource by all processes. This mechanism is called the management process.
Tube programming is an improved concurrent programming model based on semaphore mechanism, which solves the problem of PV operation pairing in critical region, and the concurrent programming theory is formed by concentrating PV operation pairing together, which greatly reduces the cost of use and understanding.
The pipe side consists of four parts:
- Shared variables within a pipe.
- Condition variables inside the pipe.
- A process that executes in parallel within a pipe.
- A statement that sets initial values for shared data locally and internally within a pipe.
Thus, the pipe is an object monitor. Any thread that wants to access the resource (shared variable) will queue up to monitor scope. After entering, they are checked, and if they do not meet the conditions, they continue to wait until notified, and then continue to enter the monitor.
It is important to note that semaphores and tubes are equivalent. Semaphores can implement tubes, and tubes can implement semaphores, just in different forms. Tubes are more developer-friendly.
The difference between the two is as follows
In order to solve the problem of PV pairing of semaphore in critical region, the pairwise PV operations are grouped together, and the concept of conditional variable is added, which makes synchronization between threads easier under multiple conditions.
How to understand concepts such as entrance waiting queue, shared variables and conditional variables in pipe procedures? Sometimes the technical concepts are difficult to understand. We can help us understand them with the help of scenes in daily life.
- After registering, the patient goes to the waiting room to be called
- When called, you can enter the clinic
- There are two kinds of cases: one is that the doctor quickly determines the patient’s disease and makes a diagnosis, and then notifies the next patient to come in for treatment. The other is that the doctor cannot determine the cause of the disease and the patient needs to do a blood test/CT examination to determine the condition, so the patient goes for a blood test/CT examination first
- After the patient finishes blood test/CT, take the number again and wait for call number (enter the entrance and wait for queue)
- The patient waits for his/her number and goes back to the doctor with the blood test/CT report
The whole process is as follows
So how does a tube handle mutual exclusion and synchronization
First to see the mutex, the doctor is a Shared resource (i.e., Shared variables), processing is the critical section, the patient that thread, if you want to access any patient critical region, must first access to a Shared resource (the doctor), entrance only allow one thread through at a time, in the Shared resources occupied in some cases, if another thread is to possess a Shared resource, He need to go to the waiting queue waiting for, wait for access to a Shared resource thread to release resources, after waiting queue of threads in the competition can go to a Shared resource, which can solve the problem of mutual exclusion, so essentially tube side is through the Shared resources and to the operation of the Shared resources safely get and release (thread) encapsulated to ensure mutual exclusion.
In terms of synchronization, synchronization is achieved through the conditional variable and its waiting queue in the paper. The realization of synchronization can be divided into two cases
- After a patient enters the clinic, there is no need for blood test/CT and other operations. Therefore, after the doctor completes the diagnosis, the shared resources are released (unlocked) to notify (notifyAll) the next patient waiting in the queue at the entrance. The next patient can see the doctor after hearing the call number.
- If a patient needs to do blood test/CT or other operations after entering the clinic, the patient will queue up in the blood test/CT queue (conditional queue), release the shared variable (doctor), and notify other patients (thread) in the waiting queue at the entrance to obtain the shared variable (doctor). After executing the logic of the critical section, the licensed thread will wake up the thread in the conditional variable waiting queue and put it in the entry waiting queue. When it obtains the shared variable (doctor), it can enter the entry (critical section) for processing.
In Java, the lock is mostly realized by relying on pipe procedures. Take synchronized, a familiar built-in lock, as an example, its realization principle is as follows.
You can see that synchronized locks are also implemented on a pipe, but only with one and only condition variable (the Lock object itself), which is one of the reasons why the JDK implements Lock locks, which support multiple condition variables.
Through this analogy, I believe you have a clear understanding of the working mechanism of pipe procedures. Why do we spend so much effort to introduce pipe procedures? On the one hand, pipe procedures are the master key to solve concurrency problems. Here’s a look at how AQS works.
AQS implementation principle
AQS full name is AbstractQueuedSynchronizer, is a framework used to build the lock and synchronizer, it maintains a Shared resource state and wait a FIFO queue (i.e., on the tube side entrance waiting queue), the underlying mechanism of the CAS was used to ensure atomicity operation.
The main principles of AQS are as follows:
For example, to implement an exclusive lock (that is, the current resource can only be owned by one thread), the implementation principle is as follows: State is initialized to 0. In multi-threaded conditions, to execute the code of the critical section, the thread must first obtain state. After a thread obtains state successfully, the state will be increased by 1. When the thread holding state completes the code release of the critical section (state minus 1), it wakes up the next waiting thread in the FIFO (the next node in the head) to obtain the state.
Since state is a shared variable by multiple threads, it must be defined as volatile to ensure the visibility of state. Meanwhile, although volatile can guarantee visibility, it cannot guarantee atomicity. Therefore, AQS provides atomic operation methods for state to ensure thread safety.
In addition, FIFO queue (CLH queue) realized in AQS is actually realized by bidirectional linked list, represented by head and tail nodes. Head node represents the thread currently occupied. Other nodes queue up for lock release due to temporarily unable to obtain the lock.
Therefore, it is not difficult to understand the following definition of AQS:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// The following are the beginning and end nodes of the bidirectional list, representing the entry wait queue
private transient volatile Node head;
private transient volatile Node tail;
// Share the variable state
private volatile int state;
// CAS obtains/releases state to ensure that the thread can safely acquire the lock
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// ...
}
Copy the code
AQS source code analysis
ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock ReentrantLock This paper will use both pictures and pictures to let you understand the realization principle of AQS. In the process of learning, you can compare with the examples mentioned above, which will be helpful to understand.
ReentrantLock is an exclusive lock, and there are also fair and unfair lock modes. What are exclusive and shared lock modes, and what are fair and unfair lock modes
What is the difference between an exclusive lock and a shared lock
Exclusive lock: other threads can only compete for the lock if the thread holding the lock is released. Only one thread can compete for the lock (only one doctor can see one patient at a time).
Shared lock: A shared resource can be held by multiple threads until the shared resource is occupied (multiple doctors and patients). Common read/write locks include ReadWriteLock and CountdownLatch. The differences between the two are as follows
What are fair locks and unfair locks
Or take medical treatment as an example, the so-called fair lock means that everyone takes the number and waits for the number in the waiting room in the order of first come first come last. If it is not fair lock, the new patients (threads) are very domineer, do not take the number and queue up, and directly go to see the doctor first, occupying the doctor (not necessarily successful).
This article we will focus on the analysis of the exclusive, non-fair mode of the source code implementation, not analysis of the shared mode and Condition implementation, because the analysis of the implementation of the exclusive lock, because the principle is similar, and then analysis of the sharing and Condition is not difficult.
Let’s first look at the use of ReentrantLock
// 1. Initialize reentrant locks
private ReentrantLock lock = new ReentrantLock();
public void run(a) {
/ / lock
lock.lock();
try {
// 2. Execute critical section code
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/ / 3. Unlocklock.unlock(); }}Copy the code
The first step is to initialize the reentrant lock, and you can see that the unfair lock mechanism is used by default
public ReentrantLock(a) {
sync = new NonfairSync();
}
Copy the code
Of course, you can also specify a fair lock using the following constructor:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code
Note: FairSync and NonfairSync are internal classes of ReentrantLock implementation. They refer to fair and unfair modes, respectively. Internally, unlock is a call to FairSync, NonfairSync’s lock and unlock method.
The relationships of several classes are as follows:
Let’s first examine the implementation of NonfairSync, and look at the second step of the above example code: lock, since the default is not fair lock lock, so let’s analyze how the unfair lock is locked
You can see that the Lock method has two main steps
- CAS is used to obtain the state resource. If 1 is successfully set, it indicates that the lock of the state resource is successfully obtained.
- If CAS fails to set state to 1 (representing failure to acquire the lock), then the acquire(1) method is executed, which is provided by AQS as follows
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
TryAcquire analysis
TryAcquire is first called to try to get state, and if it succeeds, the next steps are skipped. If that fails, an acquireQueued is executed to add the thread to the CLH wait queue.
Let’s take a look at the tryAcquire method, which is a template method provided by AQS and finally implemented by its AQS concrete implementation class (Sync). Since it performs unfair lock logic, it executes the following code:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// If c = 0, the resource is free. CAS is used to obtain the lock
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
ReentrantLock = ReentrantLock = ReentrantLock = ReentrantLock = ReentrantLock
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Copy the code
This code shows that there are two main types of lock acquisition
- When state is 0, it means that the lock has been released and can be acquired. Therefore, CAS is used to obtain the lock resource again. If the lock resource is obtained successfully, it means that the lock competition is successful. You can understand why the current implementation is unfair because both the queue thread and the new thread can CAS the lock, and the new thread does not need to queue
- If current == getExclusiveOwnerThread() is true, it means that the thread is holding the lock again. The setState method does not need CAS update, because the lock is owned by the current thread and no other thread has a chance to execute the code. So updating state is thread-safe at this point.
Assume that the current state = 0, i.e. the lock is not occupied, and now there are three threads T1, T2, and T3 competing for the lock
Assume that T1 succeeds in obtaining the lock, then T1 succeeds in obtaining the lock for the first time
2. If T1 succeeds in acquiring the lock again, state will increase by 1, indicating that the lock has been re-entered twice. If T1 has successfully applied for the lock all the time, state will continue to accumulate
AcquireQueued analysis
If tryAcquire(ARG) fails, the lock fails to be acquired, and the acquireQueued method is executed to add the thread to the FIFO wait queue
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
AcquireQueued will be queued by calling addWaiter(Node.exclusive) to queue a Node containing the current thread. Node.exclusive means that this Node is in EXCLUSIVE mode
How does addWaiter work
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// If the tail is not empty, CAS is used to enqueue the thread that failed to acquire the lock
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// If the node is empty, execute the enq method
enq(node);
return node;
}
Copy the code
This logic is relatively clear, the first is to obtain the tail node of THE FIFO queue, if the tail node exists, the CAS method is used to queue the waiting thread, if the tail node is empty, the implementation of ENQ method
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// If the last node is empty, the FIFO queue is not initialized
if (compareAndSetHead(new Node()))
tail = head;
} else {
// If the tail is not empty, the thread will be queued
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
First, check whether tail is empty. If tail is empty, it indicates that the HEAD of the FIFO queue and tail have not been built yet. In this case, build the head node first, and then use CAS to enqueue this thread node
When creating a head Node using CAS, we simply call the new Node() method instead of logging thread as we do on other nodes
SetExclusiveOwnerThread (current thread); setExclusiveOwnerThread(current thread); In the ‘exclusiveOwnerThread’ attribute.
After executing addWaiter, the thread is enqueued successfully, and now comes the final and most important method acquireQueued, which is a little hard to understand, so let’s simulate the steps in the previous code with three threads
1. Assume that T1 succeeds in acquiring the lock. Since FIFO is not initialized at this time, create head node first
2. At this time, T2 or T3 fails to compete for state and joins the team, as shown in the figure below:
Immediately blocking means that the thread changes from running state to blocked state, which involves switching from the user state to the kernel state. It also has to switch from the kernel state to the user state when it wakes up, which is relatively expensive. So the way AQS does this with enrogated threads is to spin them to compete for locks, as shown below
But clever you may find a problem, if the current lock is an exclusive lock, if the lock has been blinded by T1, T2, T3 always spin too big significance, it will occupy CPU, impact performance, so the better way is that they spin a competition less than twice after the lock inside story up to wait for the front nodes to awaken it again after releasing the lock.
In addition, if the lock is interrupted during spin, or if the spin times out, it should be in the “cancelled” state.
AQS defines a variable waitStatus for each Node based on its possible state, and performs operations on the corresponding Node based on the value of this variable. Let’s look at the values of this variable. It’s time to look at the attributes of a Node
static final class Node {
static final Node SHARED = new Node();// Indicates that the waiting node is in shared mode
static final Node EXCLUSIVE = null;// Indicates that the waiting node is in exclusive mode
static final int CANCELLED = 1; // The node has been cancelled due to timeout or interruption
static final int SIGNAL = -1; // If the node is SIGNAL, unpark can be used to wake up the next node after the lock is released or cancelled.
static final int CONDITION = -2;// Indicates that the thread is waiting for the condition variable (acquire the lock, join the condition waiting queue, and then release the lock, and wait for the condition variable to meet the condition; Can only be returned after the lock is reacquired.
static final int PROPAGATE = -3;// indicates that subsequent nodes propagate awakened operations, which work in shared mode
// Wait state: for condition, initialize to condition; In other cases, the default is 0 and atomic updates are handled through CAS
volatile int waitStatus;
Copy the code
From the definition of state, we can guess how AQS handles thread spin: if the last node of the current node is not head and its state is SIGNAL, the node is blocked. Take a look at the code to verify our guess:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// If the previous node is head, try to spin the lock
if (p == head && tryAcquire(arg)) {
// set head to the current node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// If the previous node is not head or failed to compete for the lock, the node is blocked
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if (failed)
// This method is called if the thread fails to acquire the lock in its spin due to an exception or other reasoncancelAcquire(node); }}Copy the code
In the first case, if the previous node of the current node is the head node, and the tryAcquire is successfully processed
As you can see, the main processing is to point the head to the current node and let the original head out of the queue, which will be garbage collected because the original head is unreachable.
Notice how setHead is handled
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
Copy the code
After setting head to the current node, set thread and pre to null. As previously analyzed, head is a virtual node and does not store other information except waitStatus, so set thread and pre to null. The ‘head’ thread is locked by the ‘exclusiveThread’. If the head is locked by the ‘head’ thread, the lock will be released by the head thread again.
If a node was not lock head or competition before failure, the first call shouldParkAfterFailedAcquire method to determine whether a lock should stop spin into the blocked state:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 1. If the state of the leading vertex is SIGNAL, the current node can be blocked
return true;
if (ws > 0) {
// 2. Remove the node from the cancelled state
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// if ws is not 0, SIGNAL is set to SIGNAL.
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
This code is a bit convoluted, so you need to think about it a little bit, step by step
1. According to the comments of the Node class, if the precursor Node is SIGNAL, the current Node can be blocked.
As shown in the figure, the waitStatus of the precursor of T2 and T3 is SIGNAL, so T2 and T3 can be blocked at this time.
2. If the precursor node is cancelled, then the precursor node needs to be removed. A more ingenious method is adopted to remove all the nodes whose waitStatus is cancelled before the current node. T2 and T3 will be GC.
If the preceding node is less than or equal to 0, set the preceding node to SIGNAL first. As previously mentioned, one condition for the current node to block is that the preceding node must be SIGNAL. In this case, the next spin-discovery will return true (i.e. Step 1).
ShouldParkAfterFailedAcquire returns true represents a thread can enter the block interrupt, the next step parkAndCheckInterrupt should make threads blocked
private final boolean parkAndCheckInterrupt(a) {
// Block the thread
LockSupport.park(this);
// Returns whether the thread has been interrupted and clears the interrupted state (an interrupt is added after the lock is acquired)
return Thread.interrupted();
}
Copy the code
The blocking thread is easy to understand, but why is it necessary to determine whether the thread has been interrupted? If the thread receives an interrupt during the blocking period, it needs to complete an interrupt after waking (to run) acquireQueued (true), as shown below:
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// If the thread is awakened because of an interrupt, the interrupt needs to be filled after acquiring the lock
selfInterrupt();
}
Copy the code
The process of acquiring locks has been analyzed, but there is still one puzzle that remains: when a Node is set to the cancelled state, it will be cancelled.
Looking back at acquireQueued
` ` `java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// omit the spin lock code
} finally {
if (failed)
// This method is called if the thread fails to acquire the lock in its spin due to an exception or other reasoncancelAcquire(node); }}Copy the code
Look at the last cancelAcquire method, which is called if the thread ultimately fails to acquire the lock in its spin because of an exception or other reason
private void cancelAcquire(Node node) {
// If the node is empty, return directly
if (node == null)
return;
// The thread is about to be cancelled, so the thread is cleared
node.thread = null;
// Point node's pre to the first non-cancelled node (i.e. skip all cancelled nodes). WaitStatus > 0 indicates that the current node is cancelled
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Get the next node of pre after filtering. This step is mainly used in the next node of Pre
Node predNext = pred.next;
// Set the current node to cancel
node.waitStatus = Node.CANCELLED;
// If the current cancelled node is the last node, CAS sets the last node to its predecessor, and if it succeeds, the next pointer to the last node is null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If the current node is canceled, the precursor node of the current node should be pointed to the successor node of the current node. However, as we said earlier, to wake up or block a node, the precursor node must be in SIGNAL state. So when setting the next node of pre, make sure that the pre node is in SIGNAL state.
int ws;
if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
Node next = node.next;
if(next ! =null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// If pre is head, or the state of pre fails to set SIGNAL, then the successor node will be awakened to compete for the lock
unparkSuccessor(node);
}
node.next = node; // help GC}}Copy the code
This code is a little bit convoluted, so let’s look at it one by one, and let’s first consider the following case
1. In the first step, if there is a cancellation node before the current node, the logic is as follows
2. If the current node is neither a successor nor a tail node of the head node, as shown in Step 1, the final result is as follows
In this case, the current node and its precursor cannot be GC. Remember that we did this when we analyzed the spin of the lock
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// omit extraneous code
if (ws > 0) {
// Remove the node from the cancelled state
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
return false;
}
Copy the code
This code points the node’s pre to the node whose waitStatus was previously non-Cancel
So when T4 executes this code, it looks like this
As you can see, the middle two CANCEL nodes are not reachable and will be GC
If the current node is a tail node, the result is as follows. In this case, the current node is unreachable and will be GC
4. If the current node is the successor of head
Results of CANCEL nodes will also be in tail spin call shouldParkAfterFailedAcquire inaccessible after, as follows
Now that we have analyzed the lock acquisition process, let’s look at how the lock is released.
Release the lock
Whether it is a fair lock or an unfair lock, the following template method of AQS is finally called to release the lock
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
// Check whether the lock is released successfully
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
The tryRelease method is defined in Sync, a subclass of AQS
// java.util.concurrent.locks.ReentrantLock.Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// Only the thread holding the lock can release the lock, so if the current lock is not the thread holding the lock, an exception is thrown
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
// All locks held by the thread are released. The holder of the exclusiveOwnerThread needs to be released
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Copy the code
After the lock is released, the node after the head is awakened to compete for the lock
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
// Check whether the lock is released successfully
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
// After the lock is released, wake up the node after the head to compete for the lock
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
The lock condition is h! = null && h.waitStatus ! = 0.
- If h == null, there are two possibilities: one is that a thread is contending for the lock, and now it’s released, and of course there’s no wake up successor, or another thread is running the contending lock, but hasn’t initialized the head node yet, and since another thread is running, there’s no need to do the wake up operation
- If h! = null and h.waitStatus == 0, the successor node of head is spinning for the lock, that is, the thread is running and does not need to be woken up.
- If h! = null and H. waitStatus < 0, in which case the waitStatus value may be SIGNAL or PROPAGATE, both of which indicate that the subsequent node blocks and needs to be woken up
Let us look at the unparkprecursor:
private void unparkSuccessor(Node node) {
// Get the head waitStatus (suppose it is SIGNAL) and set CAS to 0. In fact, waitStatus = SIGNAL (< -1) or PROPAGATE (-·3) is just a flag, which means that in this state, the successor node can be woken up. Since the successor node is being woken up, it can be reset to 0 naturally. Of course, if it fails, it will not affect the wake up of the successor node
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Get the first non-cancelled node of the queue and wake it up
Node s = node.next;
// if s is not empty or cancelled, s is an invalid node and the logic in if needs to be executed
if (s == null || s.waitStatus > 0) {
s = null;
// Get the last non-cancelled node from the tail
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
The first node in the queue that is not canceled should be found from back to front, since node enqueueing is not an atomic operation
Threads spins first node. Pre = pred and then pred.next = node, and if the unparksucceeded lies in between, no successor to head can be found, as follows
How to customize a mutex using AQS
By providing state and FIFO queue management, AQS provides us with a set of general low-level methods to realize locks. Basically, to define a lock is to convert the subclass of AQS to define AQS internally and call the low-level methods of AQS to achieve. Since AQS has defined the management of state and FIFO queues at the bottom, it is relatively easy to implement a lock. We can implement a non-reentrant mutex based on AQS, as shown below
public class Mutex {
private Sync sync = new Sync();
public void lock (a) {
sync.acquire(1);
}
public void unlock (a) {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0.1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively (a) {
return getState() == 1; }}}Copy the code
It’s really convenient to see it implemented in just a few lines of code.
conclusion
This paper illustrated the way to help you comb again AQS realization way, believe everyone to finish AQS should have a more in-depth understanding, must first understand the principle, lock semaphore and monitor, understand the design method of tube side of AQS have a conceptual understanding, then to read the source code will use the concept of tube side to set, It is easier to understand. In addition, we can compare scenes in life, such as medical treatment scenes, and learn in a similar way so that we can better understand the design ideas of relevant technologies.
Finally, welcome everyone to add my friend, pull you into the technical exchange group, this year black Swan events frequent, together to warm up
Shoulders of giants
- “Operating system” producer consumer problem blog.csdn.net/liushall/ar…
- Time.geekbang.org/column/arti…
- www.cnblogs.com/binarylei/p…
- zhuanlan.zhihu.com/p/150573153
- Blog.csdn.net/anlian523/a…
- Tech.meituan.com/2019/12/05/…