What is CountDownLatch?
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
CountDownLatch allows synchronization assistance for one or more threads to wait until a set of operations performed in another thread is complete. CountDownLatch maintains a counter internally. Calling await() blocks the current thread. Calling countDown() after each thread has completed its operation subtracted the counter by one and blocks until the counter’s value goes to zero. Until the counter value reaches 0, all waiting threads are released;
Second, source code analysis
Inside the constructor, initialize a Sync(count)
//java.util.concurrent.CountDownLatch
public CountDownLatch(int count) {
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
// The state value in AQS acts as a countersetState(count); }}Copy the code
1. Diagram the AQS framework
2. AQS inner classNode
Attribute is introduced
3.countDown()
What’s going on in the method?
//java.util.concurrent.CountDownLatch
public void countDown() {
sync.releaseShared(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
// tryReleaseShared in AQS needs subclass overwrite
if (tryReleaseShared(arg)) {
// When state is 0, wake up the threads in the waiting queue
doReleaseShared();
return true;
}
return false;
}
//java.util.concurrent.CountDownLatch.Sync
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
// Reduce the count
int nextc = c - 1;
//CAS change count
if (compareAndSetState(c, nextc))
// doReleaseShared() is executed when the count is 0
return nextc == 0; }}Copy the code
Let’s take a look at the release of doReleaseShared() shared mode — sending a follow-up signal and ensuring propagation.
private void doReleaseShared() {
for (;;) {
Node h = head;
// Not the tail node
if(h ! =null&& h ! = tail) { int ws = h.waitStatus;if (ws == Node.SIGNAL) {
// Avoid unpark twice
if(! h.compareAndSetWaitStatus(Node.SIGNAL,0)) {continue;
}
// Wake up the successor node of the head node
unparkSuccessor(h);
}else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)){
// The wait state of the header node is 0
// Use CAS to set the current node state to PROPAGATE to ensure that it can be passed on later
continue; }}// Exit the loop with no change to the head node
if (h == head)
break; }}Copy the code
4.await()
What did you do?
There are two methods of “await” : one is “await” (), the other is “await” (long Timeout, TimeUnit unit)
Let’s look at await(), another method I’ll leave you to learn on your own:
//java.util.concurrent.CountDownLatch
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())// Get and clear the thread interrupt flag bit
// If the status is interrupted, throw InterruptedException
throw new InterruptedException();
// Only when the value is less than 0 will the queue be added
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
// If the last node is not empty, it is initialized
if(oldTail ! =null) {
//Unsafe.putObject(Object o, int offset, Object x)
// Set the node precursor to oldTail
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
//oldTail's successor node is set to node
oldTail.next = node;
returnnode; }}else {
// Initialize the synchronization queue so that the first and last nodes point to the same new Node instanceinitializeSyncQueue(); }}}private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// Create a shared mode node and add it to the queue
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// Return the precursors of the current node
final Node p = node.predecessor();
if (p == head) {
// Return 1 is no longer blocking, exit the queue, -1 still blocking
int r = tryAcquireShared(arg);
if (r >= 0) {
// There is an analysis
setHeadAndPropagate(node, r);
p.next = null;
return; }}// There is an analysis
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
thrownew InterruptedException(); }}}catch (Throwable t) {
// There is an analysis
cancelAcquire(node);
throwt; }}/** * Set the head node of the queue to be synchronized and determine if the current node's subsequent node is the one that shares the mode. * If the propagate is greater than 0 or the waitStatus of the node is propagate * then the resource is released in the share mode */
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// Set node to the head node
setHead(node);
/ / the propagate greater than 0 | | head node to null | | head node status to not cancel the | | once again head node is null | | get head node status to not cancel again
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// Successor node ==null or nodes in shared mode
if (s == null || s.isShared())
doReleaseShared();// Turn it up}}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// The precursor Node is set to node.signal successfully and is waiting to be released by the release call. The subsequent Node can safely block
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
// If waitStatus is greater than 0, it indicates that the precursor node has been canceled
} while (pred.waitStatus > 0);
// Find a non-cancelled node and re-connect the node with the current shared mode via the next reference
pred.next = node;
} else {
// Set all the pre-drive nodes to node.signal
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
// Block the current thread, get and reset the thread's interrupt flag bit
private final boolean parkAndCheckInterrupt() {
// Here comes the key method: block the thread implementation, relying on the Unsafe API
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
CancelAcquire (node) cancelAcquire(node)
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private void cancelAcquire(Node node) {
if (node == null)
return;
// At this point, the thread of the node has been interrupted and cancelled, and the thread of the node is empty
node.thread = null;
Node pred = node.prev;
//(skip the canceled node) Get the last non-canceled node of the current node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Save the node.prev non-cancelled node's successors
Node predNext = pred.next;
Update the current node status = Cancel
node.waitStatus = Node.CANCELLED;
// If the current node is the tail node, set the previous non-cancelled node of the current node as the tail node
// If the update fails, go to else, and if the update succeeds, set tail's successor node to NULL
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
int ws;
// If the current node is not the successor of head
// 1: determine whether the current node precursor is SIGNAL,
// 2: If not, set the precursor node to SINGAL to see if it succeeds
// If one of the two values is true, the thread of the current node is null
// If all of the above conditions are met, the pointer of the predecessor node of the current node to the successor node of the current node
if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread ! =null) {
Node next = node.next;
if(next ! =null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
// If the above conditions are not met, wake up the successor node of the current node
unparkSuccessor(node);
}
node.next = node;// help GC}}private void unparkSuccessor(Node node) {
// Get the head node waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// Get the next node from the current node
Node s = node.next;
// If the next node is null or cancelled, find the non-cancelled node at the beginning of the queue
if (s == null || s.waitStatus > 0) {
s = null;
// Start at the end of the queue and find the first node whose waitStatus is <0.
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
// Unpark the current node if the next node is not empty and the status is <=0
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
Where cancelAcquire() is called:
2. An exception occurs in acquire process 3. The remaining timeout time of API call of timeout version is less than or equal to zero
The main function of cancelAcquire() is to move the cancelled nodes out of the synchronous waiting queue, and if the conditions analyzed in the code above are met, it will wake up the unparkacquire (node) successors.
How LockSupport implements blocking and unblocking?
Click to view the JDK11 LockSupport document address
Take a look at the following code snippets:
// Example 1:
Log.d(TAG,"001")
LockSupport.park(this)
Log.d(TAG,"002") ------------------- Output:001Jam...// Example 2:
LockSupport.unpark(Thread.currentThread())
Log.d(TAG,"001")
LockSupport.park(this)
Log.d(TAG,"002")... Log.d(TAG,"Execution done.") ------------------- Output:001
002after// Example 3:
val thread = Thread.currentThread()
cacheThreadPool.execute{
Log.d(TAG,"A time-consuming asynchronous task executing...")
Thread.sleep(1500)
// Provide permission to unblock
LockSupport.unpark(thread)
}
Log.d(TAG,"001")
// Block the current thread
LockSupport.park(this)
Log.d(TAG,"002")... Log.d(TAG,"Execution done.") ------------------- Output:001A time-consuming asynchronous task is executing...002afterCopy the code
Look at the above example, do you understand? 🙈🙈 LockSupport has two park methods:
//java.util.concurrent.locks.LockSupport
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false.0L);
setBlocker(t, null);
}
public static void park() {
U.park(false.0L);
}
/** * Retrieve the parkBlocker field object of Thread class by reflection. * Retrieve the memory offset of parkBlocker */ through the objectFieldOffset method on the sun.misc.unsafe object
private static void setBlocker(Thread t, Object arg) {
U.putObject(t, PARKBLOCKER, arg);
}
static {
PARKBLOCKER = U.objectFieldOffset
(Thread.class.getDeclaredField("parkBlocker"));
}
Copy the code
With who? Still use question 🤔? It is recommended to use the Park (Blocker) method.
Let’s take a look at the parkBlocker in Thread source code:
/** * The argument supplied to the current call to * java.util.concurrent.locks.LockSupport.park. * Set by (private) java.util.concurrent.locks.LockSupport.setBlocker * Accessed using java.util.concurrent.locks.LockSupport.getBlocker */
volatile Object parkBlocker;
Copy the code
The parkBlocker object is used to keep track of the thread being blocked by whom and is used by thread monitoring and analysis tools to locate the cause. LockSupport gets the blocked object via getBlocker and is primarily used to monitor and analyze threads.
Park blocks, unpark unblocks, and finally calls the implementation of the corresponding Native method inside UnSafe. Click to view the source code for UnSafe
Iii. Usage Scenarios
1.ARouter loads the class collection under the specified package name
//com.alibaba.android.arouter.utils.ClassUtils
fun getFileNameByPackageName(a):Set<String>{
val classNames: Set<String> = HashSet()
val paths = getSourcePaths(context)
val parserCtl = CountDownLatch(paths.size())
for (path in paths) {
DefaultPoolExecutor.getInstance().execute{
try{
// Time-consuming loading data....
}finnaly{
parserCtl.countDown()
}
}
}
parserCtl.await()
return classNames
}
Copy the code
2. Suppose there are three threads: A/B/C, and we start C when A/B is complete or partially complete
class TaskThread(private val taskName:String, private val countDownLatch: CountDownLatch, private val testSleep:Long): Thread() {
override fun run(a) {
Log.d(TAG,"Start" execution${taskName}Tasks, from:${currentThread().name}")
sleep(testSleep)
Log.d(TAG,"[end] to execute${taskName}Tasks, from:${currentThread().name}"Countdownlatch.countdown ()}}val countDownLatch = CountDownLatch(2)
val taskA = TaskThread("A",countDownLatch,100)
val taskB = TaskThread("B",countDownLatch,500)
taskA.start()
taskB.start()
countDownLatch.await()
Thread {
Log.d(TAG, "Task C: Start a small business.${Thread.currentThread().name}")}. The start () -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- output: "start" to perform A task, from: Thread -2Start to execute task B from: Thread-3[End] Execute task A, from: Thread-2[End] Execute task B, from: Thread-3Task C: Start a small business4
Copy the code