CountDownLatch is a tool for multithreaded control called a gate valve, counter, or latch. This tool is often used to coordinate synchronization between threads, or to communicate between threads (rather than as a mutual exclusion). Let’s get to know CountDownLatch
Know CountDownLatch
CountDownLatch enables one thread to wait for other threads to complete their work before continuing. The initial value of this counter is the number of threads. Each time a task is completed, the value of this counter decreases by one. When the value of this counter is 0, all threads are finished.
The use of CountDownLatch
CountDownLatch provides a constructor that requires you to specify an initial value. CountDownLatch also provides a countDown method that reduces the value of the counter. When the counter becomes zero, the thread awaiting on CountDownLatch is awakened. Continue to perform other tasks. You can also delay wakeup by adding a delay to the CountDownLatch.
The main methods are as follows
CountDownLatch is used in the following scenarios
CountDownLatch application scenario
A typical application scenario is when a service starts, many components and services are loaded at the same time, and the main thread waits for the components and services to be loaded. When all components and services are loaded, the main thread works with other threads to complete a task.
CountDownLatch can also implement programs for students to race and run together. The CountDownLatch is initialized as a number of threads of students. When the gun is fired, each student is a thread to complete their own task. The CountDownLatch will change to 0 until all the students finish, and the race scores will be announced together.
Following this scenario, you can extend and expand yourself into many other mission scenarios.
CountDownLatch usage
Let’s demonstrate the use of CountDownLatch using a simple counter
public class TCountDownLatch {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
Increment increment = new Increment(latch);
Decrement decrement = new Decrement(latch);
new Thread(increment).start();
new Thread(decrement).start();
try {
Thread.sleep(6000);
} catch(InterruptedException e) { e.printStackTrace(); }}}class Decrement implements Runnable {
CountDownLatch countDownLatch;
public Decrement(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run(a) {
try {
for(longi = countDownLatch.getCount(); i >0; i--){ Thread.sleep(1000);
System.out.println("countdown");
this.countDownLatch.countDown(); }}catch(InterruptedException e) { e.printStackTrace(); }}}class Increment implements Runnable {
CountDownLatch countDownLatch;
public Increment(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run(a) {
try {
System.out.println("await");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released"); }}Copy the code
In the main method we initialized a CountDownLatch with a counter of 5, and in the Decrement method we reduced by one using countDown and then slept for a while while waiting in the Increment class. The Run method in the Increment class will be woken up to resume execution after the Decrement thread completes the count reduction operation.
“CountDownLatch” is used in a race between students
public class StudentRunRace {
CountDownLatch stopLatch = new CountDownLatch(1);
CountDownLatch runLatch = new CountDownLatch(10);
public void waitSignal(a) throws Exception{
System.out.println("Players" + Thread.currentThread().getName() + "Waiting for the referee to give the word.");
stopLatch.await();
System.out.println("Players" + Thread.currentThread().getName() + "Judge's order accepted.");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("Players" + Thread.currentThread().getName() + "Get to the finish line");
runLatch.countDown();
}
public void waitStop(a) throws Exception{
Thread.sleep((long) (Math.random() * 10000));
System.out.println("The referee"+Thread.currentThread().getName()+"Password to be issued soon.");
stopLatch.countDown();
System.out.println("The referee"+Thread.currentThread().getName()+"Password sent. Waiting for all runners to finish.");
runLatch.await();
System.out.println("All runners have reached the finish line.");
System.out.println("The referee"+Thread.currentThread().getName()+"Summary Ranking");
}
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
StudentRunRace studentRunRace = new StudentRunRace();
for (int i = 0; i < 10; i++) {
Runnable runnable = () -> {
try {
studentRunRace.waitSignal();
} catch(Exception e) { e.printStackTrace(); }}; service.execute(runnable); }try {
studentRunRace.waitStop();
} catch(Exception e) { e.printStackTrace(); } service.shutdown(); }}Copy the code
Let’s take a look at CountDownLatch’s source code
CountDownLatch source code analysis
CountDownLatch is simple to use, but it is so useful that you can now add the CountDownLatch utility class to your toolbox. Let’s take a closer look at CountDownLatch.
CountDownLatch is supported by AbstractQueuedSynchronizer of the bottom of society, and the data structure of AQS is at the core of the two queues, one is the synchronous queue (sync queue), a queue is a condition (condition queue).
Sync inner class
CountDownLatch internally is a Sync that inherits the AQS abstract class.
private static final class Sync extends AbstractQueuedSynchronizer {... }Copy the code
CountDownLatch actually has only one sync property inside it, and it is final
private final Sync sync;
Copy the code
CountDownLatch has only one constructor with arguments
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Copy the code
That is, the number of counters must be specified during initialization, and a negative number will throw an exception.
We then initialize count to the count inside Sync, which is
Sync(int count) {
setState(count);
}
Copy the code
Notice there’s a setState(count), what does that mean? This is just an operation to set the state, but it also means that the value of state represents the number of threads waiting to meet the condition. We’ll discuss this when we talk about the countDown method.
GetCount () method returns a value is getState () method, it is the method in AbstractQueuedSynchronizer, this method returns the current thread count, have volatile reads the semantic memory.
// ---- CountDownLatch ----
int getCount(a) {
return getState();
}
// ---- AbstractQueuedSynchronizer ----
protected final int getState(a) {
return state;
}
Copy the code
The tryAcquireShared() method is used to obtain the state of the object in the shared state and determine whether the object is 0. If 0 is returned, it means that the object can be obtained; if not, -1 is returned, it means that the object cannot be obtained.
protected int tryAcquireShared(int acquires) {
return (getState() == 0)?1 : -1;
}
// the ---- getState() method is the same as above ----
Copy the code
This shared state belongs to the concept of AQS, which is divided into two modes, one is exclusive mode, the other is shared mode.
- TryAcquire exclusive mode, in which an attempt to acquire a resource is returned true on success or false on failure.
- TryAcquireShared Share mode, trying to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.
The tryReleaseShared() method is used to release in shared mode
protected boolean tryReleaseShared(int releases) {
// Reduce the number and notify when it becomes 0.
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0; }}Copy the code
This method is an infinite loop that retrieves the thread state. If the thread state is 0, it is not occupied by the thread. If it is not occupied, it returns false, it is freed. Then the next state is -1, and compareAndSetState CAS method is used to compare the memory value with the memory value. If the memory value is also 1, the memory value will be updated to 0 to determine whether nexTC is 0. If the CAS comparison fails, the loop judgment will be conducted again.
If the use of CAS is not clear, readers can refer to this article to tell you the big secret of AtomicInteger!
Await method
The await() method is one of the most important CountDownLatch methods. Basically, CountDownLatch and await methods are the essence of CountDownLatch. This method will cause the current thread to wait until the CountDownLatch count reaches zero, unless the thread is interrupted.
There are two await methods in CountDownLatch: await() with no arguments, and await(long timeout, TimeUnit unit) with waiting time. Let’s look at the await() method first.
public void await(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
Will await within a method call acquireSharedInterruptibly method, method, in this is acquireSharedInterruptibly AQS to interrupt sharing mode.
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Copy the code
As you can see, acquireSharedInterruptibly method of internal threads will first determines whether the interrupt, if thread interruption, will direct selling thread interrupt abnormal. If there are no interrupts, they are shared. If the lock cannot be acquired in a shared manner, the link will be shared.
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
This method is a little long, so let’s separate it out
- First, a shared mode Node is constructed and enqueued
- Then the infinite loop is used to judge the precursor node of the newly constructed node. If the precursor node of the node is the head node, the thread state will be judged. Here a setHeadAndPropagate is called, the source code is as follows
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code
First the head node is set, then a series of judgments are made to get the node’s successors and release them in shared mode, and then the doReleaseShared method is called. Let’s look at the doReleaseShared method again
private void doReleaseShared(a) {
for (;;) {
Node h = head;
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code
This method determines whether the head node is equal to the tail node in an infinite loop, and exits if it is. If the head node is not equal to the tail node, it determines whether the status is SIGNAL. If it is not, it continues to loop through compareAndSetWaitStatus and disconnects the successor nodes. If the state is not SIGNAL, compareAndSetWaitStatus is also called to set the state to PROPAGATE, the state is 0 and unsuccessful, and the loop continues.
That is to say, sethead propagate is a series of processes for setting the first node and releasing the subsequent nodes.
- Let’s look at the following if judgment, which is
shouldParkAfterFailedAcquire(p, node)
here
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
Copy the code
If the above Node p = node.predecessor() is not the head Node, the park disconnection operation will be conducted to determine whether the disconnection can be made at this time, and the judgment criteria are as follows
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
This method will determine the Node status (waitStatus) of the precursor Node of Node P. There are five waitStatus of Node p
-
CANCELLED(1) : indicates that the current node has been CANCELLED. When a timeout or interrupt occurs (in the case of a response interrupt), a change is triggered to this state, and the node will not change after entering this state.
-
SIGNAL(-1) : indicates that the successor node is waiting for the current node to wake up. When a successor node joins the queue, its status is updated to SIGNAL.
-
CONDITION(-2) : indicates that the node is waiting on CONDITION. When another thread calls CONDITION signal(), the node in CONDITION will be transferred from the waiting queue to the synchronization queue, waiting for the synchronization lock.
-
PROPAGATE(-3) : In the shared mode, not only does the previous parent node wake up the subsequent one, but it may also wake up the subsequent one.
-
0: indicates the default state when a new node joins the queue.
Returns true if the precursor is SIGNAL, but ws == Node.CANCELLED if the precursor is greater than zero. The only conditions for WS greater than zero are CANCELLED. This is followed by a series of lookup traversals until the waitStatus of the precursor node is greater than 0. If ws <= 0 and is not SIGNAL, CAS is used to replace the ws of the precursor node with SIGNAL.
False is returned if the check determines that the state is interrupted.
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
This method uses locksupport. park to disconnect and then returns a flag indicating whether the thread is interrupted.
cancelAcquire()
Used to unqueue a node from the queue if the resource is not successfully acquired (such as timeout, or interrupted in the case of interruptible).
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
Node next = node.next;
if(next ! =null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC}}Copy the code
So, the await call to CountDownLatch will roughly follow.
One method that is overloaded with await is await(long timeout, TimeUnit unit). The main difference between await and await is that the method can wait a certain amount of time before performing further operations.
CountDown method
CountDown is as important as await. CountDown reduces the number of counters and releases all threads if the count is reduced to zero.
public void countDown(a) {
sync.releaseShared(1);
}
Copy the code
This method calls the releaseShared method, which is used for release operations in shared mode, and determines whether the release can take place by the tryReleaseShared method of the CountDownLatch internal class Sync
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// ---- CountDownLatch ----
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0; }}Copy the code
TryReleaseShared does a for loop to determine the thread status value, and keeps trying to replace it with CAS.
If it does, the doReleaseShared method is called
private void doReleaseShared(a) {
for (;;) {
Node h = head;
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code
As you can see, doReleaseShared is also an infinite loop of trying to replace with CAS.
conclusion
This paper is the basic use and source code analysis of CountDownLatch. CountDownLatch is a counter based on AQS, and its internal methods are all talked about around the AQS framework. In addition, there are other implementations of AQS, such as ReentrantLock and Semaphore. Therefore, to study concurrency, it is necessary to discuss AQS. The source code for CountDownLatch looks small and simple, but the internal call links for CountDownLatch, such as await methods, are long and worth the time to explore.
I am Cxuan, a programmer of technical creation. If you think this article is good, beg readers to like, read, share!