preface
In front of us through ReentranLock source code analysis of AQS exclusive mode of access and release, through the analysis of the source code we probably understand the realization of AQS exclusive mode function, this time we will talk about AQS shared mode. If you already understand the exclusivity we talked about earlier, it’s also easy to understand the shared.
AQS share mode vs. exclusive mode
Before I talk about AQS sharing mode, I want to explain my understanding of sharing mode and exclusive mode.
In fact, WHEN I first understood the shared mode, I was also a little difficult to understand, most people called the shared mode shared lock, but I tried to use the concept of shared lock to understand, many places did not make sense. When looking at the source code, with the concept of exclusive lock to substitute into the understanding of time can be understood, but with shared lock to substitute into the understanding of words, personal feeling to understand some uncomfortable, not particularly good to understand. I want to use a different concept for shared locks, conditional licenses.
We can think of a block of code similar to the above code modification as a lock, which has one or more permissions in the lock area. We know that ReenTranLock is exclusive mode, so we can understand that there is only one available license in ReenTranLock. When one thread has been granted permission, other threads can only be blocked. You can only attempt to obtain permission after the thread that has been granted permission has released it. Another important point is that in the exclusive mode, AQS will record the thread of possession license, while in the shared mode, it will not record this, and it will purely judge by using the state.
Semaphore and CountDownLatch are implemented in shared mode. Although both are shared mode, there are some differences. The value passed in by Semaphore during instantiation can be considered as the number of available permissions, as long as state is not equal to zero, threads can access the modified code block. CountDownLatch is a little different. The value we give when we initialize it does not represent available permissions, we can interpret it as unavailable permissions, because it is more like a valve. As long as state is not equal to zero, all threads block on the await() method only when the countDown method is called, Make state equal to 0 so that all blocked threads can be woken up to continue with the subsequent code.
Why is it understood as conditional permission? Because both the shared mode and the exclusive mode of AQS are established on a certain condition, which is the determination of the state value. In the exclusive mode, we cannot assign a value to state, because there is only one permission by default, but the same thread can be re-entered. In shared mode, we can assign a value to state. The value of state represents how many permissions are available to threads, and only the thread that obtains the permissions can access the protected code block. But the implementation of CountDownLatch is a little bit different. We can think of the state value in CountDownLatch as the number of unusable permits and only the number of unusable permits is zero in order for the thread to pass. One important point I want to emphasize here is that in shared mode, you don’t acquire the concept of locks, you acquire the concept of permissions, and this is my personal understanding of both concepts.
The following is a comparison of AQS shared mode and exclusive mode. Let’s look at the code in detail.
An exclusive lock | A Shared lock |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireInterruptibly(int arg) | doAcquireSharedInterruptibly(int arg) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
– | doReleaseShared() |
Example: CoutDownLatch
Let’s take CoutDownLatch as an example to see how the shared mode can be implemented.
The structure of CountDownLath
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount(a) {
return getState();
}
protected int tryAcquireShared(int acquires) {... }
protected boolean tryReleaseShared(int releases) {... }}
private final Sync sync;
public CountDownLatch(int count) {... }public void await(a) throws InterruptedException {... }public boolean await(long timeout, TimeUnit unit). }public void countDown(a) {... }// Get the state value in AQS public long getCount(a) {... }Copy the code
We see the method through the source code is only a few, there is also an internal class inherited AQS, rewrite tryAcquireShared and tryReleaseShared, the implementation of the logic is quite simple, the core method is also two, one is countDown method, every call, The current state value is subtracted by one, and when the state value is 0, all waiting threads are woken up. The other is await method, which has two forms, one is blocking and one is timeout. If state is not 0, suspend the thread calling await method until state is 0, and wake up all waiting threads.
To obtain permission
await
This method blocks and waits in response to interrupts.
public void await(a) throws InterruptedException {
/ / acquireSharedInterruptibly acquireShared and () () method is not too much, one is to support the disruption, a does not support interrupt. We're not going to separate acquireShared ().
sync.acquireSharedInterruptibly(1);
}
Copy the code
Await () method is called directly the acquireSharedInterruptibly method in AQS
// This method is in AQS
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Copy the code
This method is the tryAcquireShared method in the AQS overwritten by CountdonwLatch.
protected int tryAcquireShared(int acquires) {
If state = 0, return 1; if state = 0, return -1.
return (getState() == 0)?1 : -1;
}
Copy the code
The tryAcquireShared method returns an int. The tryAcquireShared method returns 1 or -1 by checking whether the current state value is equal to 0. If tryAcquireShared returns a value less than 0, the current state value is not equal to 0. If tryAcquireShared returns a value less than 0, the current state value is not equal to 0.
doAcquireSharedInterruptibly
Here we can see that when tryAcquireShared return 1 shows the state value is not equal to zero, is executed doAcquireSharedInterruptibly method, thread encapsulated into the Node Node, add to the waiting queue.
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
The acquireQueued method has two parameters, and addWaiter is added to the method.
// addWaiter as we mentioned in exclusive mode, this step is mainly to queue the thread that has not obtained the permission. But the thread is not suspended yet.
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// By using the addWaiter method, the blocked thread has been added to the queue as a Node Node
// Check whether the current node's precursor node is a header.
final Node p = node.predecessor();
if (p == head) {
If (p == head && tryAcquire(arg)) acquireQueued
TryAcquire returns a Boolean value.
// Since the node just added to the queue is the first in the blocking queue, we can check again to see if statede is equal to 0.
int r = tryAcquireShared(arg);
// If the value returned is greater than 0, the state value is 0. Subsequent threads can pass directly.
if (r >= 0) {
// If the return value of tryAcquireShared is greater than 0, state is already 0.
// This method is also an important method, singled out.
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; }}// The current Node is not the first Node in the CLH queue or the current thread failed to obtain permission. Check if the thread has been interrupted and throw an exception if it has. This is what we said in the previous article, and we can refer to the exclusive mode above.
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw newInterruptedException(); }}finally {
If the code goes to cancelAcquire, it must throw InterruptedException. If the code returns InterruptedException, it must throw InterruptedException. Failed should be false and cancelAcquire will not be executed
if(failed) cancelAcquire(node); }}Copy the code
setHeadAndPropagate
By comparing the methods in the exclusive mode, we can see that in the exclusive mode, if the current node is the first one in the blocking queue, we point the head node to the current node, but we simply point the head node to the current node.
We know that the condition for calling setHeadAndPropagate is that the value returned by tryAcquireShared is greater than 0, and we know that tryAcquireShared is implemented by the concrete class that implements it, so we understand the meaning of the tryAcquireShared return, Depending on the implementation class, only AQS where tryAcquireShared returns a value greater than 0 can be interpreted as the number of permissions that can be obtained.
The setHeadAndPropagate method does doReleaseShared() under certain conditions in addition to the current node that the head points to. This method is supposed to only appear at the time of the release of permission, to wake up threads in the waiting queue, but it does here.
private void setHeadAndPropagate(Node node, int propagate) {
// Record the previous old header
Node h = head;
// Set the current node to the head node. Since state = 0 and Node is waiting for the first Node in the queue, Node can be set to the head.
setHead(node);
// Propagate is greater than 0.
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// If there is no other node behind the current node and the node is a shared node, doReleaseShared is executed
Node s = node.next;
if (s == null || s.isShared())
// Release the license and wake up the thread in the wait queue.doReleaseShared(); }}Copy the code
I don’t see why I should try to determine the state of the old header. Read his notes, did not want to understand why to judge it so.
In fact, I have another question, we know that the premise for calling setHeadAndPropagate is that Node is first in the queue, and the value returned by tryAcquireShared is greater than 0. I understand calling the setHeadAndPropagate method to set the Node Node as the head Node, but why call doReleaseShared to wake up the waiting thread when there is only one Node in the blocking queue and the Node is not suspended yet.
With that in mind, let’s move on to the doReleaseShared method, which we won’t explain in detail until we examine the release permissions below. The main thing I want to show here is why this method is called in the await () method.
// State == 0 when this method is called
private void doReleaseShared(a) {
for (;;) {
Node h = head;
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue;
unparkSuccessor(h);
}else if (ws == 0&&! compareAndSetWaitStatus(h,0, Node.PROPAGATE))
continue;
}
if (h == head)
break; }}Copy the code
The doReleaseShared method found that the unparksucceeded is the wake up of the suspended threads. = null && h ! = tail, we initialize the queue by adding a Node Node to the queue when we call the addWaiter method. Tail = Node, as shown on the left.
When we call setHead(node), we change head = node to tail = head. The unpark antecedents will not be carried out.
Now h == head doesn’t change and we’re out of the loop. So in doAcquireSharedInterruptibly setHeadAndPropagate method, this method call just set the node node became head node, not perform unparkSuccessor (). So the setHead() method works just as well here as in exclusive mode.
Release the license
When the CountDownLatch’s countDown () method is executed, the counter, the state, is reduced by one, and when it reaches zero, the threads in the waiting queue are released. Let’s take a look
countDown
public void countDown(a) {
sync.releaseShared(1);
}
Copy the code
ReleaseShared (int ARG) is used to release permissions.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
tryReleaseShared
DoReleaseShared () does not wake up the waiting thread. If the value of state is 0, return false. DoReleaseShared () does not wake up the waiting thread. Because state is already 0. Only if state-1 equals 0 will true be returned to wake up the threads in the wait queue.
protected boolean tryReleaseShared(int releases) {
// An infinite loop is used and exits when state == 0 or CAS successfully sets the value of state.
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0; }}
/** The tryReleaseShared method is an infinite loop that returns true unless an exception is thrown. protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; }} * /
Copy the code
doReleaseShared
The countDown method decreases state by 1 each time it is called, and if state is reduced to 0, the following method is called to wake up the thread in the blocking queue
/** if (tryReleaseShared(arg)) { doReleaseShared(); return true; } * /
// In CountDownLatch, tryReleaseShared(arG) returns true only if state is 0.
private void doReleaseShared(a) {
for (;;) {
Node h = head;
H == null: The blocking queue is empty
// 2. H == tail: indicates that the head node is either a newly initialized head node or a normal thread node, but the head node has been woken up and there are no other nodes in the blocking queue
// So there is no need to wake up the successor node in either case
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
// Check whether the head node is in a normal state. Successor nodes need to be woken up
if (ws == Node.SIGNAL) {
// The CAS fails
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
// Here, wake up the head's successor node. We did this in our last article, but we won't do it here.
unparkSuccessor(h);
}else if (ws == 0&&! compareAndSetWaitStatus(h,0, Node.PROPAGATE))
continue; }}}Copy the code
The method is a spin operation (for(;)) ), the only way to exit the method is with the final break statement:
if (h == head) // loop if head changed
break;
Copy the code
That is, exit only if the current head has not changed hands, otherwise the loop continues. How do I understand this?
For the sake of illustration, let’s assume that the sync Queue is currently in order
dummy node -> A -> B -> C -> D
Now assuming that A may succeed, it will become the new dummy node,
dummy node (A) -> B -> C -> D
So thread A gets woken up, wakes up and then the thread ends, wakes up and goes back to where it was suspended.
So here’s a for(;;) The only exit is from the end of the return. Thread A sets itself up as the head node.
Wake up the successor node B, which soon acquires the shared lock and becomes the new head node:
dummy node (B) -> C -> D
At this point, thread B will also call doReleaseShared. We’ll do doReleaseShared[B], which will wake up the subsequent node C, but remember that when doReleaseShared[B] is called, DoReleaseShared [A] is not finished yet. When it reaches if(h == head), it finds that the head node has now changed, so it continues back into the for loop. Meanwhile, doReleaseShared[B] also enters the for loop during execution. .
A large number of threads execute doReleaseShared at the same time, which greatly speeds up the wake up of subsequent nodes and improves efficiency. Meanwhile, the CAS operation in this method ensures that only one thread can successfully wake up a node when multiple threads wake up a node at the same time.
Wouldn’t method A exit before node B becomes the new head node when thread A finishes executing? Yes, but even then it doesn’t matter, because it has successfully woken up thread B, and even if thread A exits, it will be responsible for waking up subsequent nodes when thread B becomes the new head node.
Here we put the AQS sharing mode analysis again, partners can have a good grasp of this process.
conclusion
- The invocation framework of the shared mode is similar to that of the exclusive mode. We can think of the exclusive mode as internally allowing only one available license, while the shared mode can allow multiple available licenses.
- Another major difference between the shared and exclusive modes is that the exclusive mode uses a variable to record which thread is licensed, while the shared mode does not record this, depending entirely on the value of the state variable.
Write in the last
After the last write exclusive mode that article, I have been to work overtime, have not rest for two weeks before the May Day, May Day added two days, really feel there is no state recently, later also didn’t have the energy to write things, take time, at halting finished writing the article, if have write wrong place hope can help you with that I’ll fix it the first time.
Hope that after watching the harvest of the friends help to give a thumbs-up to encourage.