introduce
When you read this article, you need to understand the principle of AQS first, because this article does not involve the explanation of the internal principle of AQS.
CountDownLatch is a synchronization aid that allows multiple threads to wait for completion of their tasks before executing the following statements. Previously, threads used the thread. join method to wait. CountDownLatch uses an AQS lock. The internal structure of AQS has been described earlier. In fact, there is a state field inside the AQS that controls the lock operation. How does CountDownLatch control the completion of multiple threads’ execution? In fact, state is used internally as a counter for CountDownLatch. For example, when we initialize, the state counter is 3, and three threads are opened at the same time. When one thread succeeds in executing, the state value is reduced by 1 for each thread until it reaches 0, indicating that all threads have completed executing.
The source code parsing
Take an example to start the source code analysis, the following content will be for example to source decomposition process, we open three threads, the main thread needs to wait for three threads are completed before the subsequent task processing, the source code as follows:
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// Count 3.
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; ++i) {
new Thread(new Worker(countDownLatch, i)).start();
}
// Wait for all three threads to complete
countDownLatch.await();
System.out.println("All three threads have executed.");
}
// Movers work thread work class.
static class Worker implements Runnable {
private final CountDownLatch countDown;
private final Integer id;
Worker(CountDownLatch countDown, Integer id) {
this.countDown = countDown;
this.id = id;
}
@Override
public void run(a) {
try {
Thread.sleep(500);
doWork();
} catch (InterruptedException e) {
e.printStackTrace();
}
countDown.countDown();
System.out.println("The first" + id + "Each thread completes the work.");
}
void doWork(a) {
System.out.println("The first" + id + "One thread to work."); }}}Copy the code
Here is an example of how CountDownLatch works. In this example, three threads are enabled and each thread performs its own task. The main latch waits for the three threads to complete.
So let's wait for three threads to finish the first thread to start work and the 0th thread to start work and the 0th thread to finish work and the first thread to finish work and the second thread to finish work and the third thread to finish workCopy the code
Here we will think three threads as porters, transporting the goods to the car, three men must carry their assigned tasks at hand can trigger, after the completion of that truck driver needs to wait for three were completed to start, truck drivers hands at this time there is a small notebook, record the handling of the total number of threads did not start as shown below
When the porters began to work, each porters busy their own tasks, if when the worker 1 completed, need to report to the driver, said I have completed the task, at this time the driver will record in his small book, worker 1 has completed the task, at this time there are two workers did not complete the task.
When the workers complete their tasks at hand, will report to the driver, when all the workers are completed, at this time the number of completed workers have been recorded in the small book, the driver can start at this time, because three people have completed the handling work.
From the above example, we can get a general understanding of the simple principle of CountDownLatch. How to ensure that the driver (state) records who has completed and who has not completed? CountDownLatch completes the counter function internally through AQS state. Next, the source code will be used for detailed analysis:
public class CountDownLatch {
/** * sync control, * uses the state of AQS to indicate the count. * /
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// Initialize the state value (i.e. wait for several threads to complete the task)
Sync(int count) {
setState(count);
}
// Get the state value.
int getCount(a) {
return getState();
}
// Get the lock.
protected int tryAcquireShared(int acquires) {
If state=0, the lock can be obtained, otherwise, the current thread can be queued and blocked.
// Here is the key point.
return (getState() == 0)?1 : -1;
}
protected boolean tryReleaseShared(int releases) {
When state is reduced to 0, the blocking thread can process.
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0; }}}// Lock the object.
private final Sync sync;
/** * Initializes the synchronization lock object. * /
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/** * causes the current thread to wait until the latch countdown reaches zero, unless the thread is interrupted. If the current count is zero, this method returns immediately. If the current count is greater than zero, * the current thread is disabled for thread scheduling and sleeps until one of two things happens: * 1. The count reaches zero. * 2. If the current thread is interrupted. * /
public void await(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/** * wait for the counter to clear or be interrupted, wait for some time if there is still no */
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/** * causes the current thread to wait until the latch countdown reaches zero, unless the thread is interrupted or the specified wait time has expired. * /
public void countDown(a) {
sync.releaseShared(1);
}
/** * returns the state value. * /
public long getCount(a) {
returnsync.getCount(); }}Copy the code
CountDownLatch source code appears to be very small, through CountDownLatch source code to see that the internal implementation is based on AQS, Internal Sync class inherits from AbstractQueuedSynchronizer and implements tryAcquireShared and tryReleaseShared, through the constructor, creates a AQS synchronization objects, and to the state value is initialized, If the initialization count is less than 0, an exception is thrown.
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// Initialize the state value of AQS.
this.sync = new Sync(count);
}
Copy the code
Let’s take a look at the internal state of the AQS under initialization based on the above example:
Awit method
When call awit method, internal calls actually AQS acquireSharedInterruptibly method, this method invokes the Sync tryAcquireShared method, through the above example, when we initialize the 2 will be the state value is initialized, (getState() == 0)? 1:1; If the value of state is false, -1 is returned. If the value is negative, the current thread is suspended and placed in the AQS queue until the current thread is woken up if the AQS state drops to 0, or InterruptedException is thrown. And then back.
/** * causes the current thread to wait until the latch countdown reaches zero, unless the thread is interrupted. If the current count is zero, this method returns immediately. If the current count is greater than zero, * the current thread is disabled for thread scheduling and sleeps until one of two things happens: * 1. The count reaches zero. * 2. If the current thread is interrupted. * /
public void await(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
When a thread calls await method, internal calls are AQS acquireSharedInterruptibly actually, let’s take a look at AQS internal acquireSharedInterruptibly method
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// The response is interrupted
if (Thread.interrupted())
throw new InterruptedException();
// Call the tryAcquireShared method.
if (tryAcquireShared(arg) < 0)
// Block the thread, adding it to the blocking queue until other threads resume the thread.
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
AcquireSharedInterruptibly internal call Sync is CountDownLatch inner class to implement method of tryAcquireShared tryAcquireShared to determine whether a state is cleared, whether is the counter has been reset, The current thread will be suspended, and the suspended thread will be put into the blocking queue of AQS, waiting for other threads to wake up.
CoutDown method
CountDown of CountDownLatch is called. ReleaseShared of AQS is called in the countDown method. ReleaseShared is implemented in Sync. If the state value is changed to 0, the following operations will not be performed. If the state value is changed to 0, other threads will also call the countDown operation and the state value will become negative. When the state value decreases by 1, the waiting threads in the blocking queue will be notified. If one of the lines countDown first, state=1 and the thread on the blocking queue wakes up, the thread will call tryAcquireShared again and return -1, and the thread will suspend and block. The queue status is as follows:
When another thread completes and calls countDown, the state decreases by 1 to state=0. When the waiting thread is woken up,tryAcquireSharedIf the result is 1, success is returned.
conclusion
CountDownLatch uses the state of AQS as the counter function. When CountDownLatch is initialized, the state value will be initialized so that when CountDownLatch’s awit is invoked, the state counter will be determined to be 0. If it does not reach zero, the current thread is suspended and added to the blocking queue of AQS. If a thread calls CountDownLatch’s countDown, the state counter is reduced by 1. Each time the countDown is reduced, the thread in the blocking queue is awakened. The thread determines whether the state counter has run out at this point, and if not, continues to suspend the current thread until the state counter is cleared or the thread is interrupted.
If you like, you can follow my wechat public account and push articles from time to time