This is the 8th day of my participation in the First Challenge 2022. For details: First Challenge 2022.
Analysis of the working principle of CountDownLatch
A general introduction
This article will share and analyze the working principle of CountDownLatch for JDK1.8.
Get to know CountDownLatch
What is a CountDownLatch?
-
“CountDownLatch” literally means “count” or “down”, and “Latch” means “door Latch”.
-
CountDownLatch is a synchronization help that allows one or more threads to wait until a set of actions performed in another thread completes;
-
CountDownLatch does not have a static internal class for fair locks. There is only a Sync static internal class for CountDownLatch. CountDownLatch is basically called through sync. XXX and other methods.
-
CountDownLatch maintains a virtual resource pool internally. If the number of permits is not zero, threads will block and wait until the number of permits reaches zero.
The state keyword for CountDownLatch
-
The implementation of CountDownLatch also makes good use of the state variable value of its parent AQS;
-
Initialize a quantity value as the default value of the counter, say N, and when any thread calls countDown once the count is reduced by 1 until the wait is cleared to 0;
-
CountDownLatch: Group A waits for another group B thread to execute until group B finishes.
Use important methods
Creates a count synchronizer object given an allowable number of values
public CountDownLatch(int count)
Copy the code
Join the queue and wait until the counter value reaches 0
public void await(a)
Copy the code
Release permission, the counter value is reduced by 1, if the counter value is 0 triggers the release of useless nodes
public void countDown(a)
Copy the code
Gets the latest current shared resource counter value
public long getCount(a)
Copy the code
Design and implement pseudocode
Obtain a shared lock:
- If detecting the interrupted status finds that it has been interrupted, InterruptedException is thrown
- If an attempt to acquire a shared lock fails (the various methods of attempting to acquire a shared lock are implemented by subclasses of AQS),
- The new shared lock is added to the queue by spinning, and then blocked by calling locksupport. park until the counter reaches zero
public void await(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
Release the shared lock:
- If an attempt to release a shared lock fails (the various ways of attempting to release a shared lock are implemented by subclasses of AQS),
- Then the blocking thread is invoked by spinning
public void countDown(a) {
sync.releaseShared(1);
}
Copy the code
CountDownLatch lives in detail
For example, I used the 100-meter race as an example to illustrate the principle of CountDownLatch in daily life:
-
1. Scene: There is a judge’s count at the end of the 100-meter race with ten runners;
-
2, start running a start signal, ten people scrambling to run to the finish line, really exciting many seconds, exciting;
-
3, when a person arrives at the finish line, the person has completed his race things, it is all right to play, then the referee is minus a person;
-
4, as the personnel one by one ran to the end, the final judge count shows that there are 0 people did not arrive, meaning that the personnel have reached;
-
5, then the referee took the registration result pidianpidian to enter the computer registration;
-
6. This series of actions is considered as group A thread waiting for the operations of other group threads until the counter reaches zero, then A will do other things.
Source code analysis CountDownLatch
CountDownLatch constructor
Constructor source code:
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Copy the code
2. Create a count synchronizer object with a given number. The count must be greater than zero, and count is assigned to state.
Sync synchronizer
AQS –> Sync
The synchronizers in CountDownLatch operate on call relationships through the Sync abstract interface. If you look closely, you’ll see that most of the synchronizers in CountDownLatch operate on calls like sync.xxx.
await()
The source code
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplainThread#interrupt interrupted}. * * // Causes the current thread to wait until the count is reduced to zero and the wait is released, or the wait can also be released because the thread is interrupted; * * <p>If the current count is zero then this method returns immediately. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
public void await(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
- Await this method is called. The core of this method is to block waiting due to the call to locksupport. park.
- When the counter value state=0, the status quo can be broken. Of course, the status quo can also be broken after the thread is interrupted.
acquireSharedInterruptibly(int)
Source:
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // Check whether the thread has been interrupted before the call
throw new InterruptedException(); // Throw an interrupt exception if it has been interrupted
if (tryAcquireShared(arg) < 0) If the value is less than 0, the lock fails. This method is implemented by a concrete subclass of AQS
doAcquireSharedInterruptibly(arg); // Enqueue the thread attempting to acquire the lock resource
}
Copy the code
Because is to realize the function of synchronous counter, so tryAcquireShared calls must be less than zero, for the first time went smoothly entered the doAcquireSharedInterruptibly threads waiting for; (getState() == 0); (getState() = 0); 1:1 “;
3.5, tryAcquireShared (int)
The source code
protected int tryAcquireShared(int acquires) {
return (getState() == 0)?1 : -1; // If the counter value is smaller than zero, the lock fails to be acquired; if the counter value is larger than zero, the lock is acquired successfully
}
Copy the code
Try to obtain the shared lock resource, but in the CountDownLatch function, if the number is less than zero, the latch will be queued and blocked. Greater than zero wakes up the wait queue and releases the blocking wait of the await method;
doAcquireSharedInterruptibly(int)
The source code
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Create new nodes in the given mode, including node. EXCLUSIVE and Node.SHARED.
final Node node = addWaiter(Node.SHARED); // Create shared mode nodes
boolean failed = true;
try {
for (;;) { // Spin is an infinite loop
final Node p = node.predecessor(); // Get the precursor of the node
if (p == head) { // If the parent node is head, then the parent node is head
int r = tryAcquireShared(arg); // The second brother also wants to try to get the lock, in case one node happens to be released just now. There is hope, in case it comes true
if (r >= 0) { // If r>=0, the shared lock resource has been successfully obtained
setHeadAndPropagate(node, r); // Set the current node as a header and call doReleaseShared to release the unused node
p.next = null; // help GC
failed = false;
return;
}
// But this is the case when the await method is called for the first time. In this case, it will fail to obtain the lock resource, i.e. r<0, so it will enter the judgment whether sleep is needed
// When the node is set to sleep for the first time, waitStatus=0 will be changed to SIGNAL
/ / and the second cycle into shouldParkAfterFailedAcquire method, return true is the need to sleep, the block waiting for smooth call park ways
}
if (shouldParkAfterFailedAcquire(p, node) && // See if you need to rest according to the precursor
parkAndCheckInterrupt()) // Block operation. Normally, no shared lock can be obtained, and the code stops in this method until it is woken up
// parkAndCheckInterrupt() throws an exception
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
DoAcquireSharedInterruptibly in implementing counter principle, the main do is wait for to wait for, wait until the counter value is zero awakening;
countDown()
The source code
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * *
If the current count equals zero then nothing happens. */
public void countDown(a) {
sync.releaseShared(1); // Release a license resource
}
Copy the code
Release the permitted resource, that is, the counter value is continuously decrement by 1, when the counter value is zero, this method will release all waiting thread queue; See releaseShared(int arg) for more details on why releaseShared is released.
releaseShared(int)
Source:
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // Try to release the shared lock resource. This method is implemented by a concrete subclass of AQS
doReleaseShared(); // Spin to wake up the successor node
return true; // Returning true indicates that all threads have been freed
}
return false; // Return false if the counter is not zero
}
Copy the code
-
ReleaseShared first evaluates the return value of tryReleaseShared(ARG), but if the counter value is not zero, it will return false immediately.
-
So when the counter drops to zero, it immediately returns true, and doReleaseShared is immediately called to release all waiting thread queues;
tryReleaseShared(int)
Source:
// CountDownLatch's static internal class Sync's tryReleaseShared method
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) { // Spin is an infinite loop
int c = getState(); // Get the latest counter value
if (c == 0) // If the counter value is zero, it has been reduced to zero by CAS, so there is no need to do anything when zero is read in concurrency, so return false
return false;
int nextc = c-1; // Count decrement by 1
if (compareAndSetState(c, nextc)) // The CAS comparison returns true on success
return nextc == 0; // If the nexTC obtained by the calculation operation is zero, the CAS modification succeeds, then everything is done and all waiting thread queues need to be released
// If the CAS fails, the only thing to do is to loop the next time to see if it has already been processed by another thread}}Copy the code
The static inner class of CountDownLatch implements the methods of its parent AQS to handle how to release the latch. In general, if negative numbers are returned, the latch is placed in a blocking queue, otherwise all wait queues are released.
doReleaseShared()
The main purpose is to release all waiting queues in the thread, when the counter value is zero, this method will be called immediately, through the spin polling mode to kill all waiting queues;
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */
private void doReleaseShared(a) {
/* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
for (;;) { // Spin is an infinite loop
Node h = head; // Fetch the header of the queue each time
if(h ! =null&& h ! = tail) {// If the header is not empty and is not the end of the team
int ws = h.waitStatus; // Get the waitStatus of the header
if (ws == Node.SIGNAL) { // If the header is SIGNAL, it means that its successors need to be woken up
// Try to set the state of the header to null through CAS. If this fails, the loop continues, because the concurrency may also be free elsewhere
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h); // Wake up the successor of the header
}
// If the head node is in the null state, change it to PROPAGATE state; if the head node fails, it may be changed because of concurrency, then the process is repeated
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// If the header does not change, then the above setting is complete
// If there is a change, possibly due to a new header or something during the operation, then it must be retried to ensure that the wake up action continues
if (h == head) // loop if head changed
break; }}Copy the code
conclusion
1. With the foundation of AQS analysis, the analysis of CountDownLatch is much faster; 2. I briefly summarize some of the features of the CountDownLatch process: • Manage a counter value greater than zero; • For every countDown, state is reduced by 1 until the number of licenses equals zero. • CyclicBarrier can also be used together with countDown/await;Copy the code
CountDownLatch usage
The CountDownLatch class provides only one constructor:
public CountDownLatch(int count) {};// The argument count is the count value
Copy the code
The following three methods are the most important in the CountDownLatch class:
public void await(a) throws InterruptedException {};// The thread calling the await() method is suspended and waits until count is 0 before continuing
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {};// Similar to await(), except to wait a certain amount of time before count has changed to 0
public void countDown(a) {};// Subtract count by 1
Copy the code
CountDownLatch, a synchronization helper class, allows one or more threads to wait until they complete a set of operations that are being performed in other threads.
Here’s an example:
package main.java.CountDownLatch;
import java.util.concurrent.CountDownLatch;
public class countDownlatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for(int i=0; i<5; i++){new Thread(new readNum(i,countDownLatch)).start();
}
countDownLatch.await();
System.out.println("Thread execution terminated...");
}
static class readNum implements Runnable{
private int id;
private CountDownLatch latch;
public readNum(int id,CountDownLatch latch){
this.id = id;
this.latch = latch;
}
@Override
public void run(a) {
synchronized (this){
System.out.println("id:"+id);
latch.countDown();
System.out.println("Thread group task"+id+"Over. Other missions continue."); }}}}Copy the code
Output result:
Id :1 Thread group task 1 ends, and other tasks continue ID :0 Thread group task 0 ends, and other tasks continue ID :2 Thread group task 2 ends, and other tasks continue ID :3 Thread group task 3 ends, other tasks continue ID :4 Thread group task 4 ends, And other tasks continue Thread execution ends... The thread continues its work after countDown()Copy the code