CountDownLatch was analyzed based on AQS
“This is the sixth day of my participation in the First Challenge 2022. For details: First Challenge 2022”
Code case
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
try {
Thread.currentThread().setName("Thread one");
// Wait a moment
System.out.println(Thread.currentThread().getName() + "Wait for thread 2 and thread 3 to complete before executing......");
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "Executed.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
Thread.currentThread().setName("Thread two");
System.out.println(Thread.currentThread().getName() + "Execute complete, execute countDown");
countDownLatch.countDown();
}).start();
new Thread(() -> {
Thread.currentThread().setName("Thread three");
System.out.println(Thread.currentThread().getName() + "Execute complete, execute countDown"); countDownLatch.countDown(); }).start(); }}Copy the code
Code run result
Code interpretation
Thread 1 has to wait for thread 2 and Thread 3 to complete before it can execute, like a trip, until everyone has arrived
Source code analysis
Constructor analysis
Let’s take a look at this important code
CountDownLatch countDownLatch = new CountDownLatch(2);
Copy the code
Take a look at the constructor first
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Copy the code
There’s also the sync property variable, again the key AQS sync utility class, so let’s take a look at what that is
Sync(int count) {
setState(count);
}
Copy the code
So I’m going to set the state, which we all know is the key marker for the lock, and if the state is not zero then some other thread is holding the lock and we’re passing in 2 here, so I’m going to set the state to 2
protected final void setState(int newState) {
state = newState;
}
Copy the code
Await method analysis
The current thread is suspended, or the thread is waiting for the state value to be set to 0 before trying to get the lock
public void await(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
Call await method inside the method call sync. AcquireSharedInterruptibly (1), enter inside and have a look
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Copy the code
TryAcquireShared (arg) < 0 = tryAcquireShared(arg) < 0
protected int tryAcquireShared(int acquires) {
return (getState() == 0)?1 : -1;
}
Copy the code
This method is to determine whether the current the state value is equal to zero, if it is 0, then return 1, if no conditions, at this time so returned to the state is equal to 2-1, so go doAcquireSharedInterruptibly (arg) method
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // The argument passed in is 1
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
The addWaiter method is a bit long, so let’s take a look at the addWaiter method, which passes a Node.SHARED, which adds a Node to the AQS waiting queue
private Node addWaiter(Node mode) {
// Create a Node
Node node = new Node(Thread.currentThread(), mode);
// The pred and tail Pointers are currently null
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// Hence the way to join the team
enq(node);
return node;
}
Copy the code
Create a Node (tail, pred, tail); create a Node (tail, pred, tail, tail)
private Node enq(final Node node) {
for (;;) {
// T is empty, tail is also empty, so we need to initialize the queue
Node t = tail;
if (t == null) { // Must initialize
// Create an empty Node, then point the empty head pointer to the Node, then tail the pointer to the head Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
T ==null; tail ==null; create an empty Node; then point the empty head pointer to the Node; then point the tail pointer to the head Node
The new Node points its precursor pointer to the head Node of the current queue, and its tail pointer points to the current queue Node. Next of t points to the current queued node and returns the head node, the style of the node in the queue
Now it’s time to go to the logic in the for loop below
for (;;) {
// Get the precursor node of the current node, in which case the precursor is the head node
final Node p = node.predecessor();
// The condition is valid
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
Copy the code
Go to the logic below and try to get the shared mode lock tryAcquireShared and the value is passed in as 1
protected int tryAcquireShared(int acquires) {
return (getState() == 0)?1 : -1;
}
Copy the code
At this point it’s still -1, so once again the fetch fails, and we go down to the suspended logic, which we’ve seen many times
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// The first time the waitStatus of the head node is 0 or null
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// Set the waitStatus of the head node to -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
So when you first come in, the waitStatus of the head node is still 0 or null, so you get here the first time and you set the waitStatus of the head node to -1, and then the queue looks like this
The second after acquiring a lock failed again again again into the shouldParkAfterFailedAcquire waitsStatus this method when the head node is 1 first if logic is successful, And returns true, and then goes to the next method parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
The thread is suspended in this method, and the thread is adding itself to the AQS queue by calling the await method
CountDown method analysis
After another thread calls the countDown method twice, the thread suspended from the await method can execute again. What does this method do
public void countDown(a) {
sync.releaseShared(1);
}
Copy the code
The releaseShared(1) method of the Sync utility class is called
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
TryReleaseShared (ARG)
protected boolean tryReleaseShared(int releases) {
for (;;) {
// Get state, which is 2
int c = getState();
if (c == 0)
return false;
// Subtract one
int nextc = c-1;
// The cas operation sets the state value
if (compareAndSetState(c, nextc))
return nextc == 0; }}Copy the code
After a call to the countDown method, the value of state is reduced by one, so that the state is changed from 2 to 1, but false is returned. The doReleaseShared() method is not used, and when the countDown method is called the second time, Return nexTC == 0 is true and doReleaseShared() is used. What does that do
private void doReleaseShared(a) {
for (;;) {
// define a head pointer to the head node of the AQS queue
Node h = head;
// h is not empty and is not the last node, so the condition is true
if(h ! =null&& h ! = tail) {// The waitStatus of the head node is -1
int ws = h.waitStatus;
// If this is true
if (ws == Node.SIGNAL) {
// Change the waitStatus of the head node from -1 to 0
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
// Wake up subsequent nodes to acquire locks
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code
We have analyzed this method several times, look at the comments of the code above, and the figure below
See how to wake up the AQS waiting queue thread
private void unparkSuccessor(Node node) {// a header node is passed in
// The waitStatus of the head node has been changed to greater than 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// Define an s pointer to the next node after the preceding node
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
// The thread wakes up when s is not null
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
Look at what the AQS queue looks like so far
Analyze the logic after awakening
Where does the thread get suspended, I think it gets suspended when it gets the lock let’s move on, okay
That’s right, that’s where the suspension takes place, so that’s where we wake up and let’s see what we do, go to the next loop and get the lock again, define a P to point to the head node, and then we get it because the state is zero, and then we return one, so r is equal to one, At this point we enter the setHeadAndPropagate(node, R) code block
private void setHeadAndPropagate(Node node, int propagate) {
// Define h pointer to head node
Node h = head;
// Set the head node
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code
Setting the head node
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
Copy the code
Propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 However, the next node of node is null, so s==null is valid. If you go to doReleaseShared(), what does this method do
private void doReleaseShared(a) {
for (;;) {
// define an h to point to head
Node h = head;
// h is not null and h is not tail
if(h ! =null&& h ! = tail) {// Now the waitStatus of h has been changed to 0
int ws = h.waitStatus;
/ /
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// change the waitStatus of h to -3
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code
Keep looking at the logic and go down and change the other Pointers
p.next = null; // help GC
failed = false;
return;
Copy the code
At this point the method is done, has acquired the lock, and continues with its own logic