AbstractQueuedSynchronizer (AQS) source
Last article we ReentrantLock to AQS source code has a general understanding! And explained the difference between fair lock and unfair lock and the realization principle of reentrant lock, this article is mainly aimed at AQS source code including exclusive lock, lock sharing, lock release source code and realization principle on the line summary;
The overall structure of AQS
As the cornerstone of JUC package, THE importance of AQS is self-evident. The importance of AQS was also illustrated in the last article. Here we will make a long story short. Short words do not say, below officially begins our source code journey.
In the previous article we talked about fair lock there is a method used to acquire the lock hasqueued24 (). The purpose of this method is to determine whether the current thread needs to queue; That involves queuing up and definitely leaving the queue; So what does this line look like? Is this a one-way queue or a two-way queue? We look down with these questions:
Member variables in AQS:
/** ** the head node */
private transient volatile Node head;
/** * last node */
private transient volatile Node tail;
/** * Synchronization state 0: free state, greater than 0: lock */
private volatile int state;
// Get the state state
protected final int getState(a) {
return state;
}
// Set the state
protected final void setState(int newState) {
state = newState;
}
Copy the code
There are only three member variables in AQS: the head node of the synchronization queue, the tail node of the synchronization queue, and the synchronization status. Note that these three member variables are volatile. This ensures that changes made to it by multiple threads are visible to the thread!
We can roughly imagine some implementation of synchronous queues, where each Node can be viewed as a thread, as follows:
Here is the structure of each node in the queue:
static final class Node {
/** Shared node identifier (shared lock identifier) */
static final Node SHARED = new Node();
/** The identifier of the exclusive node (the identifier of the exclusive lock) */
static final Node EXCLUSIVE = null;
/** The value of waitStatus, indicating that the thread has been cancelled */
static final int CANCELLED = 1;
/** waitStatus: -1 indicates that the successor node is in the waiting state. -1 is set by the successor node for the current node, meaning that the current node will wake up its successor node if it releases the lock. For the successor node, I can sleep peacefully only when the state of its previous node is -1. When the previous node releases the lock, I need to wake up and let me try to obtain the lock */
static final int SIGNAL = -1;
/** The value of waitStatus: indicates that the current node is queued in the conditional queue, waiting for a condition */
static final int CONDITION = -2;
/** * The releaseShared state is propagated unconditionally in shared mode. This was not a concept in earlier JDK releases. This state was introduced to solve bug6801020's thread hang caused by concurrent release of shared locks (both are being improved and the code is getting more and more difficult to understand in the process) */
static final int PROPAGATE = -3;
// The value can be 1, -1, -2, -3, or 0 above
volatile int waitStatus;
// Last node
volatile Node prev;
// Next node
volatile Node next;
// The current thread
volatile Thread thread;
// Connect to the next node of the wait condition
Node nextWaiter;
Copy the code
Node belongs to AbstractQueuedSynchronizer (AQS) inner classes. Node has many properties such as predecessor Node, successor Node, current thread, and state information for each thread (Node). (If you’re familiar with the queue implementation in a data structure, this structure is essentially a linked list in a data structure, with a few more variables).
Seeing this, we can improve the diagram above:
Above we use the graph to refine the details of the synchronization queue; The member variable head in AQS points to the head node (sentinel node) in the synchronization queue, and the member variable tail points to the tail node of the synchronization queue. Each Node maintains the current Thread Thread,pre points to the previous Node,next points to the next Node, waitStatus indicates the status of the current Node, pay attention to -1.
We say that the Node is above AbstractQueuedSynchronizer (AQS) of an inner class, let’s again use class diagram to show AbstractQueuedSynchronizer (AQS) and the relationship between the Node:
ConditionObject is also maintained in AQS. This is the conditional variable waiting queue, which we did not cover in this article, so we did not draw it!
Exclusive mode lock acquisition
Here is the general structure of AQS is introduced here! Let’s focus on acquiring an exclusive lock acquiring a shared lock and releasing the lock.
When we use locks, the code is as follows:
private Lock lock = new ReentrantLock();
public void method(a) {
try {
lock.lock(); / / gets the lock
System.out.println("Perform some operations...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();/ / releases the lock}}Copy the code
AQS acquire() is called in the lock method:
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
The tryAcquire(ARG) method is basically an attempt to acquire a lock. The details of this method are implemented by subclasses; Here you can combine my last ReentrantLock to AQS source code article will explain the ReentrantLock specific implementation details! If this method returns true, the lock was acquired successfully. If this method returns false, the lock was acquired successfully. There are two cases:
tryAcquire(arg)
Method returns true, and! tryAcquire(arg)
If false, this method will return success! Because the first condition is false, soacquireQueued()
methodsaddWaiter()
Method will not execute!- when
tryAcquire(arg)
Method returns false, that is, failed to acquire the lock, instead! tryAcquire(arg)
The method is true, and the code continues down, executing firstaddWaiter(Node.EXCLUSIVE), arg)
Method, waitaddWaiter(Node.EXCLUSIVE), arg)
Method returns the result and continues executionacquireQueued()
Methods!
Let’s assume that what happens in addWaiter(Node.exclusive), ARG after the lock fails to acquire? The node.exclusive argument in the addWaiter(Node.exclusive), arg method means an EXCLUSIVE lock.
private Node addWaiter(Node mode) {
//1 The current thread is encapsulated as a node node
Node node = new Node(Thread.currentThread(), mode);
//2 Assign the last node to the variable perd
Node pred = tail;
//3 Check whether tail is empty. This code is not easy to explain in text
if(pred ! =null) {
//4 Assign pred, the last node of the current queue, to the previous node of the node
node.prev = pred;
//5 Set node to tail(the last node)
if (compareAndSetTail(pred, node)) {
// add the next pointer to the last node.
pred.next = node;
//7 Returns node
returnnode; }}//8 We can roughly guess that there are two function points in this method:
// 8.1: Initializes the queue when it is empty
// 8.2: Set node to tail if (compareAndSetTail(pred, node)) is false
enq(node);
return node;
}
Copy the code
If tail = null, enq() is executed; if tail = null, enq() is executed. The enq() method is used to initialize the queue, as we’ll see later, but we only consider when there are elements in the queue:
By the time lines 4, 5, and 6 are executed, the queue looks like this:
Here I trust you and have no questions about the lines of code 1,2,3,4,5,6,7. If you have any questions, this article is a failure;
When tail is really empty, the queue has not been initialized, and enq() is executed to initialize the queue! Initializing the queue is just one of the tasks performed by the enq() method; The enq() method also sets node to the tail when if (compareAndSetTail(pred, node)) is false. If CAS fails in code 5 above, continue to set node node as tail node. We know that CAS spins when it fails; But so far we don’t see the spin, we just implement the enq() method, is the spin in the enq() method? Let’s go inside and see if our guess is correct;
private Node enq(final Node node) {
for (;;) {
//1. Obtain the tail node of the synchronization queue
Node t = tail;
//2. If the tail node is empty, the queue has not been initialized
if (t == null) {
//3. Initialize and construct a header
if (compareAndSetHead(new Node()))
//4. Add the tail node to the newly constructed head
tail = head;
} else {
// 5 Set the prev pointer of node to the end of the queue
node.prev = t;
// 6 Set node to the tail node
if (compareAndSetTail(t, node)) {
// 7 Place the next pointer on the last node to the node (now the last node)
t.next = node;
returnt; }}}}Copy the code
Sure enough, for(;;) is done first in the enq() method. Infinite loop; First, the tail node in the synchronization queue is obtained. If the tail node is empty, the queue has not been initialized. Then the code at 3 is executed to initialize the queue. After executing code at 1, 2, 3, and 4, the state of the node looks like this:
CompareAndSetHead (new Node()) sets CAS for the head Node instead of using the Node passed in. As the head node! This is also the clever part of queue design, with the sentry node, with the sentry node compared to without the sentry node, the sentry node with the sentry node will be much easier to get in and out of the queue, so we don’t care if it’s the head node, we don’t do extra judgment for the head node, So synchronous queues use sentinel nodes as chained storage structures!
Notice that the code at steps 1, 2, 3, and 4 does not exit the loop. Instead, the loop continues, with the head node constructed. The purpose of continuing the loop is to construct a node behind the head node. After five, six, and seven steps, the node looks like this:
If (compareAndSetTail(pred, node)) is false, the code logic at 5, 6, and 7 will be executed. The logic in these three pieces of code is the same as in places 4, 5, and 6 of the addWaiter() method above, so I won’t go into too much detail here! Let’s summarize the role of the enq() method:
- when
if (compareAndSetTail(pred, node))
When false, CAS(spin) keeps trying to insert node into the tail node until it succeeds; - Call the compareAndSetHead(new Node()) method to complete the initialization of the chained synchronous queue header
We haven’t touched on waitStatus yet, so we can say that the node is just queuing up and hasn’t actually suspended itself yet; It has one more attempt to acquire the lock! We’ll see below;
We see that the enq() method has a return value, but that return value is not used here! The addWaiter() method returns the current node; This is followed by the acquireQueued() method:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1 Gets the successor node of the given node
final Node p = node.predecessor();
// there are too many comments in 2. See below for an explanation of the code in 2
if (p == head && tryAcquire(arg)) {
/ / 3
//3.1 Set node(current node) as the head node after obtaining the lock successfully; head = node;
// set the current thread to null; node.thread = null;
//3.3 Set the successor node of the current node to null; node.prev = null;
setHead(node);
//4, to aid garbage collection, empty the successor of the previous head
p.next = null;
// Set the success status
failed = false;
// Return to the interrupted state, where the entire loop is executed
return interrupted;
}
//5 Determine whether you really need to suspend
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
The code in 2 is explained:
if (p == head && tryAcquire(arg))
If the preceding node is the head node, then the current node is the first node in the queue (not counting the head node, why not?). The header is currently the node that holds the lock and is executing the business logic, but does not know when to end; For example, you go to line up to buy train tickets, there is a person in front of you, and this person is talking with the salesman little sister to buy tickets in ing.. Are you the first person in line to buy a ticket? You may be looking down at your phone, so do you need to look up from time to time to check that the person in front of you has bought his or her groceries? So if the current node is the first node in the synchronization queue, try to acquire the lock;
The state of the node at the start of the method might look like this:
The last node of T1 node is the head node, and T1 node is the first queued node. The T1 node then executes the tryAcquire(ARG) method to attempt to acquire the lock; If the lock is acquired successfully; After executing steps 2, 3, and 4:
This is an ideal state where the successor of the current node is the head node and the attempt to acquire the lock succeeded. However, if the successor node of the current node is not the head node or attempts to acquire the lock fail; 5 from the code is executed at this time, first look at the: shouldParkAfterFailedAcquire () method:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//1 Get the waitStatus value of the successor node
int ws = pred.waitStatus;
//2 Check whether waitStatus is -1. If the status of the predecessor is SIGNAL(-1), the predecessor wakes up the current node and the current node can safely suspend
if (ws == Node.SIGNAL)
//3 If the precursor node is already SIGNAL, return true, and parkAndCheckInterrupt() is executed to suspend the thread because the current node's predecessor has not released the lock
return true;
//4 If waitStatus>0, the lock is unlocked
if (ws > 0) {
//5 Clear the successor nodes in the queue whose waitStatus is greater than 0 (cancelled)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//6, the waitStataus state has a choice of 0, -2, -3. We said that -2 is the state of the conditional queue, and -3 is the state of the shared state, and we're in exclusive mode, so we're not involved; So there's only 0; Stateless (0 is also the default); Then set the successor node to -1, so that the current node will safely suspend;
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
SIGNAL = 1; SIGNAL = 1; SIGNAL = 1; SIGNAL is not the state of the current node but the state of the nodes that precede it. When a node’s state is set to SIGNAL, its next node is suspended (or about to be suspended). If a node releases or waives a lock and its status is SIGNAL, it performs an additional operation to wake up its successor.
The code at 6 sets the successor’s waitStataus state to -1, returns false if CAS fails, and continues the loop in the acquireQueued() method until it succeeds.
When shouldParkAfterFailedAcquire method returns true then perform parkAndCheckInterrupt () method:
private final boolean parkAndCheckInterrupt(a) {
// Suspend the current thread
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
The LockSupport method is a utility class. The sun.misc.unsafe method is the one that really provides the park() method;
Here we summarize what the acquireQueued() method does:
- If the precursor node of the current node is the head node and the attempt to obtain the lock succeeds, the method ends and exits. This means giving the current thread a chance to acquire the lock
- If the above conditions are not met and attempts to acquire the lock fail, a suspend operation is performed; First set the status of the precursor node’s waitStatus to -1, and then suspend the current thread by executing the park() method
Exclusive mode locks release locks
Unlock (); unlock (); release(); unlock ();
public final boolean release(int arg) {
// Release the lock
if (tryRelease(arg)) {
// Get the head node
Node h = head;
// If the head node is not empty and waitStatus is not 0; It's usually minus 1
if(h ! =null&& h.waitStatus ! =0)
// Wake up the successor node
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
private void unparkSuccessor(Node node) {
// Get the status value of waitStatus for the head node
int ws = node.waitStatus;
// If less than 0, update the status of the header to 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// Get the next node from the header
Node s = node.next;
// If the successor node is empty or the wait state is cancelled
if (s == null || s.waitStatus > 0) {
s = null;
// Traverse the queue backwards to find the first node that is not valid
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
// Wake up the first node after a given node that is not canceled
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
The lock release code is relatively simple! This is the end of acquiring and releasing the exclusive lock!
summary
Acquiring a lock
Release the lock
So far, the more difficult bone is finally finished. If you see here, it is not particularly difficult, is it? If you think so, it shows that I wrote a good article, right? Stayed up late again today… Writing articles is not easy; For the source code is not a lot of do not understand more is fear; Just put in the effort to take advantage of fragmented time; As long as the effort can do things is the simplest thing! AbstractQueuedSynchronizer (AQS) the whole class looking at you for a long, get rid of the author’s comments, and then remove the duplicated code in fact only a few really! Spring source code is much easier than the JUC package source code; It’s just that Spring’s source code is very relevant; You can also check out my Spring source code series; We will continue to update the Spring source code; Schedule after concurrent programming;
Shared-mode lock acquisition
In fact, the shared mode and the exclusive mode are very much the same, with the exclusive lock repeat a lot of code, so THERE is no plan to write another article to talk about, let’s briefly look at the shared mode; When we call ReadLock() or WirteLock() with the ReentrantReadWriteLock read-write separation lock, it internally calls:
public final void acquireShared(int arg) {
//1 Attempts to acquire the lock, but returns a different value than the exclusive lock;
if (tryAcquireShared(arg) < 0)
//2 Execute this method if the lock fails to be acquired
doAcquireShared(arg);
}
Copy the code
If (tryAcquireShared(arg));
- Negative value: Failed to obtain the lock
- Zero: The current node is successfully obtained, but the subsequent nodes cannot be obtained
- Positive value: indicates that the current node is successfully obtained and the subsequent nodes can also be obtained
When a subclass implements the logic of the tryAcquireShared lock method, the return value needs to follow this convention;
If the lock fails to be acquired, the doAcquireShared() method is executed:
private void doAcquireShared(int arg) {
//1, the addWaiter method is the same as the exclusive lock; I won't go into that here
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1 gets the precursor node
final Node p = node.predecessor();
//2 Check whether it is the head node
if (p == head) {
//3 If it is a header, try again to acquire the lock
int r = tryAcquireShared(arg);
if (r >= 0) {
//4 Indicates that the lock is successfully obtained and the lock status information is transmitted to subsequent nodes
setHeadAndPropagate(node, r);
p.next = null; // help GC
//5 If an interrupt request is received during thread blocking, respond to the request in this step
// Interrupts will be discussed separately in a later article;
if (interrupted)
selfInterrupt();
failed = false;
return; }}If you need to suspend, you will suspend it in parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
Enter the doAcquireShared method by calling the addWaiter method to wrap the current thread as a node and place it at the end of the synchronized queue. This is the process of adding nodes that we talked about in exclusive mode, but we won’t talk about here.
Node into the synchronous queue, if it is found that nodes is the head in front of it, because the head node of the thread is in the process of acquiring a lock is processing, so the next node of acquiring a lock is their turn, so the current node will not suspend itself first, but once again, try to obtain locks, if the man in front of the just released the lock, Then the current node can success in the lock, if the man is not in front of the lock is released, then will call shouldParkAfterFailedAcquire method, in this way will the head node’s state to SIGNAL, only with the state of the previous node as the SIGNAL, The current node can safely suspend itself, and all threads are suspended in the parkAndCheckInterrupt method. These are the same logic as exclusive locks!
If the current node successfully acquires the lock, then setHeadAndPropagate() is called; Methods:
private void setHeadAndPropagate(Node node, int propagate) {
// Record the old head node
Node h = head;
// Set the current node to the head node
setHead(node);
// Propagate greater than 0 means that the lock is successfully acquired, and then check the state information of the old header
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// Get the successor node of the given node
Node s = node.next;
// If the current node is empty or in the shared state
if (s == null || s.isShared())
// Wake up the successor nodedoReleaseShared(); }}Copy the code
The main purpose of this method is to wake up the successor node, which is not available in exclusive lock mode; In exclusive mode you just set yourself as the head node and so on;
private void doReleaseShared(a) {
for (;;) {
// Get the header
Node h = head;
if(h ! =null&& h ! = tail) {// Get the wait state of the head node
int ws = h.waitStatus;
// If the head node is SIGNAL, there is a queue behind it
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
// Until CAS succeeds
continue;
// Wake up the successor; There are instructions in the exclusive lock, but I won't go into them here;
unparkSuccessor(h);
}
// If the state of the head node is 0, it means that there is no one queueing behind, just change the head state to PROPAGATE
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// Until CAS succeeds
continue;
}
if (h == head)
break; }}Copy the code
At this point the header has already changed in the setHeadAndPropagate() method, changing to the current node! The conditions that must be met in the invocation of the unparksucceeded (h) method are that the heads cannot be empty and equal to the last node; This state is similar to the one below, and the waitStatus of the head node is equal to SIGNAL; SIGNAL indicates that there is a node behind it; There are nodes to wake up nodes;
This method is also a shared lock release method; The release of shared locks will not be discussed; Here we put AQS exclusive lock access, exclusive lock release and shared lock access and shared lock release are explained a; This article took several days to write; I really hope I can help you. If you can see here, it means you still recognize my article, otherwise you may not read it any longer. Hoping to get a thumbs up;