CountDownLatch was analyzed based on AQS

“This is the sixth day of my participation in the First Challenge 2022. For details: First Challenge 2022”

Code case

public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(() -> {
            try {
                Thread.currentThread().setName("Thread one");
                // Wait a moment
                System.out.println(Thread.currentThread().getName() + "Wait for thread 2 and thread 3 to complete before executing......");
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + "Executed.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(() -> {
            Thread.currentThread().setName("Thread two");
            System.out.println(Thread.currentThread().getName() + "Execute complete, execute countDown");
            countDownLatch.countDown();
        }).start();

        new Thread(() -> {
            Thread.currentThread().setName("Thread three");
            System.out.println(Thread.currentThread().getName() + "Execute complete, execute countDown"); countDownLatch.countDown(); }).start(); }}Copy the code

Code run result

Code interpretation

Thread 1 has to wait for thread 2 and Thread 3 to complete before it can execute, like a trip, until everyone has arrived

Source code analysis

Constructor analysis

Let’s take a look at this important code

CountDownLatch countDownLatch = new CountDownLatch(2);
Copy the code

Take a look at the constructor first

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
Copy the code

There’s also the sync property variable, again the key AQS sync utility class, so let’s take a look at what that is

Sync(int count) {
    setState(count);
}
Copy the code

So I’m going to set the state, which we all know is the key marker for the lock, and if the state is not zero then some other thread is holding the lock and we’re passing in 2 here, so I’m going to set the state to 2

protected final void setState(int newState) {
    state = newState;
}
Copy the code

Await method analysis

The current thread is suspended, or the thread is waiting for the state value to be set to 0 before trying to get the lock

public void await(a) throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
Copy the code

Call await method inside the method call sync. AcquireSharedInterruptibly (1), enter inside and have a look

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
Copy the code

TryAcquireShared (arg) < 0 = tryAcquireShared(arg) < 0

protected int tryAcquireShared(int acquires) {
    return (getState() == 0)?1 : -1;
}
Copy the code

This method is to determine whether the current the state value is equal to zero, if it is 0, then return 1, if no conditions, at this time so returned to the state is equal to 2-1, so go doAcquireSharedInterruptibly (arg) method

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // The argument passed in is 1
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw newInterruptedException(); }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

The addWaiter method is a bit long, so let’s take a look at the addWaiter method, which passes a Node.SHARED, which adds a Node to the AQS waiting queue

private Node addWaiter(Node mode) {
    // Create a Node
    Node node = new Node(Thread.currentThread(), mode);
    // The pred and tail Pointers are currently null
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            returnnode; }}// Hence the way to join the team
    enq(node);
    return node;
}
Copy the code

Create a Node (tail, pred, tail); create a Node (tail, pred, tail, tail)

private Node enq(final Node node) {
    for (;;) {
        // T is empty, tail is also empty, so we need to initialize the queue
        Node t = tail;
        if (t == null) { // Must initialize
            // Create an empty Node, then point the empty head pointer to the Node, then tail the pointer to the head Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

T ==null; tail ==null; create an empty Node; then point the empty head pointer to the Node; then point the tail pointer to the head Node



The new Node points its precursor pointer to the head Node of the current queue, and its tail pointer points to the current queue Node. Next of t points to the current queued node and returns the head node, the style of the node in the queue



Now it’s time to go to the logic in the for loop below

for (;;) {
    // Get the precursor node of the current node, in which case the precursor is the head node
    final Node p = node.predecessor();
    // The condition is valid
    if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            failed = false;
            return; }}if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        throw new InterruptedException();
}
Copy the code

Go to the logic below and try to get the shared mode lock tryAcquireShared and the value is passed in as 1

protected int tryAcquireShared(int acquires) {
    return (getState() == 0)?1 : -1;
}
Copy the code

At this point it’s still -1, so once again the fetch fails, and we go down to the suspended logic, which we’ve seen many times

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // The first time the waitStatus of the head node is 0 or null
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // Set the waitStatus of the head node to -1
          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
Copy the code

So when you first come in, the waitStatus of the head node is still 0 or null, so you get here the first time and you set the waitStatus of the head node to -1, and then the queue looks like this



The second after acquiring a lock failed again again again into the shouldParkAfterFailedAcquire waitsStatus this method when the head node is 1 first if logic is successful, And returns true, and then goes to the next method parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code

The thread is suspended in this method, and the thread is adding itself to the AQS queue by calling the await method

CountDown method analysis

After another thread calls the countDown method twice, the thread suspended from the await method can execute again. What does this method do

public void countDown(a) {
    sync.releaseShared(1);
}
Copy the code

The releaseShared(1) method of the Sync utility class is called

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
Copy the code

TryReleaseShared (ARG)

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        // Get state, which is 2
        int c = getState();
        if (c == 0)
            return false;
        // Subtract one
        int nextc = c-1;
        // The cas operation sets the state value
        if (compareAndSetState(c, nextc))
            return nextc == 0; }}Copy the code

After a call to the countDown method, the value of state is reduced by one, so that the state is changed from 2 to 1, but false is returned. The doReleaseShared() method is not used, and when the countDown method is called the second time, Return nexTC == 0 is true and doReleaseShared() is used. What does that do

private void doReleaseShared(a) {
    for (;;) {
        // define a head pointer to the head node of the AQS queue
        Node h = head;
        // h is not empty and is not the last node, so the condition is true
        if(h ! =null&& h ! = tail) {// The waitStatus of the head node is -1
            int ws = h.waitStatus;
            // If this is true
            if (ws == Node.SIGNAL) {
                // Change the waitStatus of the head node from -1 to 0
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;            // loop to recheck cases
                // Wake up subsequent nodes to acquire locks
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break; }}Copy the code

We have analyzed this method several times, look at the comments of the code above, and the figure below



See how to wake up the AQS waiting queue thread

 private void unparkSuccessor(Node node) {// a header node is passed in
        // The waitStatus of the head node has been changed to greater than 0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
		// Define an s pointer to the next node after the preceding node
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
     // The thread wakes up when s is not null
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code

Look at what the AQS queue looks like so far

Analyze the logic after awakening

Where does the thread get suspended, I think it gets suspended when it gets the lock let’s move on, okay



That’s right, that’s where the suspension takes place, so that’s where we wake up and let’s see what we do, go to the next loop and get the lock again, define a P to point to the head node, and then we get it because the state is zero, and then we return one, so r is equal to one, At this point we enter the setHeadAndPropagate(node, R) code block

private void setHeadAndPropagate(Node node, int propagate) {
	// Define h pointer to head node
    Node h = head;
    // Set the head node
    setHead(node);
	
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code

Setting the head node

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
Copy the code



Propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 = propagate > 0 However, the next node of node is null, so s==null is valid. If you go to doReleaseShared(), what does this method do

  private void doReleaseShared(a) {
        for (;;) {
            // define an h to point to head
            Node h = head;
            // h is not null and h is not tail
            if(h ! =null&& h ! = tail) {// Now the waitStatus of h has been changed to 0
                int ws = h.waitStatus;
                / /
                if (ws == Node.SIGNAL) {
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // change the waitStatus of h to -3
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break; }}Copy the code



Keep looking at the logic and go down and change the other Pointers

p.next = null; // help GC
failed = false;
return;
Copy the code



At this point the method is done, has acquired the lock, and continues with its own logic