What is CountDownLatch?

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

CountDownLatch allows synchronization assistance for one or more threads to wait until a set of operations performed in another thread is complete. CountDownLatch maintains a counter internally. Calling await() blocks the current thread. Calling countDown() after each thread has completed its operation subtracted the counter by one and blocks until the counter’s value goes to zero. Until the counter value reaches 0, all waiting threads are released;

Second, source code analysis

Inside the constructor, initialize a Sync(count)

//java.util.concurrent.CountDownLatch
public CountDownLatch(int count) {
   this.sync = new Sync(count);
}

private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) {
       // The state value in AQS acts as a countersetState(count); }}Copy the code

1. Diagram the AQS framework

2. AQS inner classNodeAttribute is introduced

3.countDown()What’s going on in the method?

//java.util.concurrent.CountDownLatch
public void countDown() {
  sync.releaseShared(1);
}

//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
    // tryReleaseShared in AQS needs subclass overwrite
    if (tryReleaseShared(arg)) {
       // When state is 0, wake up the threads in the waiting queue
       doReleaseShared();
       return true;
    }
    return false;
}

//java.util.concurrent.CountDownLatch.Sync
protected boolean tryReleaseShared(int releases) {
     for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        // Reduce the count
        int nextc = c - 1;
        //CAS change count
        if (compareAndSetState(c, nextc))
            // doReleaseShared() is executed when the count is 0
            return nextc == 0; }}Copy the code

Let’s take a look at the release of doReleaseShared() shared mode — sending a follow-up signal and ensuring propagation.

private void doReleaseShared() {
    for (;;) {
      Node h = head;
      // Not the tail node
      if(h ! =null&& h ! = tail) { int ws = h.waitStatus;if (ws == Node.SIGNAL) {
               // Avoid unpark twice
               if(! h.compareAndSetWaitStatus(Node.SIGNAL,0)) {continue; 
               }
               // Wake up the successor node of the head node
               unparkSuccessor(h);
             }else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)){
                // The wait state of the header node is 0
                // Use CAS to set the current node state to PROPAGATE to ensure that it can be passed on later
                continue; }}// Exit the loop with no change to the head node
          if (h == head)
             break; }}Copy the code

4.await()What did you do?

There are two methods of “await” : one is “await” (), the other is “await” (long Timeout, TimeUnit unit)

Let’s look at await(), another method I’ll leave you to learn on your own:

//java.util.concurrent.CountDownLatch
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())// Get and clear the thread interrupt flag bit
       // If the status is interrupted, throw InterruptedException
       throw new InterruptedException();
    // Only when the value is less than 0 will the queue be added
    if (tryAcquireShared(arg) < 0)
       doAcquireSharedInterruptibly(arg);
}

private Node addWaiter(Node mode) {
    Node node = new Node(mode);
    for (;;) {
       Node oldTail = tail;
       // If the last node is not empty, it is initialized
       if(oldTail ! =null) {
          //Unsafe.putObject(Object o, int offset, Object x)
          // Set the node precursor to oldTail
          U.putObject(node, Node.PREV, oldTail);
          if (compareAndSetTail(oldTail, node)) {
              //oldTail's successor node is set to node
              oldTail.next = node;
              returnnode; }}else {
           // Initialize the synchronization queue so that the first and last nodes point to the same new Node instanceinitializeSyncQueue(); }}}private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
   // Create a shared mode node and add it to the queue
   final Node node = addWaiter(Node.SHARED);
   try {
      for (;;) {
        // Return the precursors of the current node
        final Node p = node.predecessor();
        if (p == head) {
          // Return 1 is no longer blocking, exit the queue, -1 still blocking
          int r = tryAcquireShared(arg);
          if (r >= 0) {
             // There is an analysis
             setHeadAndPropagate(node, r);
             p.next = null; 
             return; }}// There is an analysis
         if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
            thrownew InterruptedException(); }}}catch (Throwable t) {
       // There is an analysis
       cancelAcquire(node);
       throwt; }}/** * Set the head node of the queue to be synchronized and determine if the current node's subsequent node is the one that shares the mode. * If the propagate is greater than 0 or the waitStatus of the node is propagate * then the resource is released in the share mode */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    // Set node to the head node
    setHead(node);
    / / the propagate greater than 0 | | head node to null | | head node status to not cancel the | | once again head node is null | | get head node status to not cancel again
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // Successor node ==null or nodes in shared mode
        if (s == null || s.isShared())
            doReleaseShared();// Turn it up}}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // The precursor Node is set to node.signal successfully and is waiting to be released by the release call. The subsequent Node can safely block
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
                // If waitStatus is greater than 0, it indicates that the precursor node has been canceled
            } while (pred.waitStatus > 0);
            // Find a non-cancelled node and re-connect the node with the current shared mode via the next reference
            pred.next = node;
        } else {
            // Set all the pre-drive nodes to node.signal
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }
    
    // Block the current thread, get and reset the thread's interrupt flag bit
    private final boolean parkAndCheckInterrupt() {
        // Here comes the key method: block the thread implementation, relying on the Unsafe API
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

CancelAcquire (node) cancelAcquire(node)

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void cancelAcquire(Node node) {
        if (node == null)
            return;
        // At this point, the thread of the node has been interrupted and cancelled, and the thread of the node is empty
        node.thread = null;
        Node pred = node.prev;
        //(skip the canceled node) Get the last non-canceled node of the current node
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
           
        // Save the node.prev non-cancelled node's successors
        Node predNext = pred.next;
        Update the current node status = Cancel
        node.waitStatus = Node.CANCELLED;
        // If the current node is the tail node, set the previous non-cancelled node of the current node as the tail node
        // If the update fails, go to else, and if the update succeeds, set tail's successor node to NULL
        if (node == tail && compareAndSetTail(node, pred)) {
            pred.compareAndSetNext(predNext, null);
        } else {
            int ws;
            // If the current node is not the successor of head
            // 1: determine whether the current node precursor is SIGNAL,
            // 2: If not, set the precursor node to SINGAL to see if it succeeds
            // If one of the two values is true, the thread of the current node is null
            // If all of the above conditions are met, the pointer of the predecessor node of the current node to the successor node of the current node
            if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread ! =null) {
                Node next = node.next;
                if(next ! =null && next.waitStatus <= 0)
                    pred.compareAndSetNext(predNext, next);
            } else {
                // If the above conditions are not met, wake up the successor node of the current node
                unparkSuccessor(node);
            }
            node.next = node;// help GC}}private void unparkSuccessor(Node node) {
	// Get the head node waitStatus
	int ws = node.waitStatus;
	if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
	// Get the next node from the current node
	Node s = node.next;
	// If the next node is null or cancelled, find the non-cancelled node at the beginning of the queue
	if (s == null || s.waitStatus > 0) {
            s = null;
            // Start at the end of the queue and find the first node whose waitStatus is <0.
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                     s = t;
	}
	// Unpark the current node if the next node is not empty and the status is <=0
	if(s ! =null)
            LockSupport.unpark(s.thread);
}
Copy the code

Where cancelAcquire() is called:

2. An exception occurs in acquire process 3. The remaining timeout time of API call of timeout version is less than or equal to zero

The main function of cancelAcquire() is to move the cancelled nodes out of the synchronous waiting queue, and if the conditions analyzed in the code above are met, it will wake up the unparkacquire (node) successors.

How LockSupport implements blocking and unblocking?

Click to view the JDK11 LockSupport document address

Take a look at the following code snippets:

// Example 1:
Log.d(TAG,"001")
LockSupport.park(this)
Log.d(TAG,"002") ------------------- Output:001Jam...// Example 2:
LockSupport.unpark(Thread.currentThread())
Log.d(TAG,"001")
LockSupport.park(this)
Log.d(TAG,"002")... Log.d(TAG,"Execution done.") ------------------- Output:001 
002after// Example 3:
val thread = Thread.currentThread()
cacheThreadPool.execute{
    Log.d(TAG,"A time-consuming asynchronous task executing...")
    Thread.sleep(1500)
    // Provide permission to unblock
    LockSupport.unpark(thread)
}
Log.d(TAG,"001")
// Block the current thread
LockSupport.park(this)
Log.d(TAG,"002")... Log.d(TAG,"Execution done.") ------------------- Output:001A time-consuming asynchronous task is executing...002afterCopy the code

Look at the above example, do you understand? 🙈🙈 LockSupport has two park methods:

//java.util.concurrent.locks.LockSupport
public static void park(Object blocker) {
  Thread t = Thread.currentThread();
  setBlocker(t, blocker);
  U.park(false.0L);
  setBlocker(t, null);
}
public static void park() {
   U.park(false.0L);
}

/** * Retrieve the parkBlocker field object of Thread class by reflection. * Retrieve the memory offset of parkBlocker */ through the objectFieldOffset method on the sun.misc.unsafe object
private static void setBlocker(Thread t, Object arg) {
  U.putObject(t, PARKBLOCKER, arg);
}

static {
    PARKBLOCKER = U.objectFieldOffset
                (Thread.class.getDeclaredField("parkBlocker"));
}
Copy the code

With who? Still use question 🤔? It is recommended to use the Park (Blocker) method.

Let’s take a look at the parkBlocker in Thread source code:

/** * The argument supplied to the current call to * java.util.concurrent.locks.LockSupport.park. * Set by (private) java.util.concurrent.locks.LockSupport.setBlocker * Accessed using java.util.concurrent.locks.LockSupport.getBlocker */  
volatile Object parkBlocker;
Copy the code

The parkBlocker object is used to keep track of the thread being blocked by whom and is used by thread monitoring and analysis tools to locate the cause. LockSupport gets the blocked object via getBlocker and is primarily used to monitor and analyze threads.


Park blocks, unpark unblocks, and finally calls the implementation of the corresponding Native method inside UnSafe. Click to view the source code for UnSafe

Iii. Usage Scenarios

1.ARouter loads the class collection under the specified package name

//com.alibaba.android.arouter.utils.ClassUtils
fun getFileNameByPackageName(a):Set<String>{
     val classNames: Set<String> = HashSet()
    val paths = getSourcePaths(context)
    val parserCtl = CountDownLatch(paths.size())
    for (path in paths) {
       DefaultPoolExecutor.getInstance().execute{
          try{
            // Time-consuming loading data....
          }finnaly{
              parserCtl.countDown()
          }
       }
    }
    parserCtl.await()
    return classNames
}
Copy the code

2. Suppose there are three threads: A/B/C, and we start C when A/B is complete or partially complete

class TaskThread(private val taskName:String, private val countDownLatch: CountDownLatch, private val testSleep:Long): Thread() {
    override fun run(a) {
       Log.d(TAG,"Start" execution${taskName}Tasks, from:${currentThread().name}")
       sleep(testSleep)
       Log.d(TAG,"[end] to execute${taskName}Tasks, from:${currentThread().name}"Countdownlatch.countdown ()}}val countDownLatch = CountDownLatch(2)
val taskA = TaskThread("A",countDownLatch,100)
val taskB = TaskThread("B",countDownLatch,500)
taskA.start()
taskB.start()
countDownLatch.await()
Thread {
   Log.d(TAG, "Task C: Start a small business.${Thread.currentThread().name}")}. The start () -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- output: "start" to perform A task, from: Thread -2Start to execute task B from: Thread-3[End] Execute task A, from: Thread-2[End] Execute task B, from: Thread-3Task C: Start a small business4
Copy the code