Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

The profile

In front of “exclusive lock” and “shared lock” have a general understanding; In this chapter, we will learn about CountDownLatch. Like ReadWriteLock.ReadLock, CountDownLatch is essentially a “shared lock.”

CountDownLatch profile

CountDownLatch is a synchronization helper class that allows one or more threads to wait until they complete a set of operations that are being performed in other threads.

CountDownLatch is used to allow 1 or N threads to wait for other threads to complete. CyclicBarrier allows N threads to wait on each other. CountDownLatch’s counter cannot be reset; A CyclicBarrier is said to be a CyclicBarrier because its counters can be reset for use. The principle of CyclicBarrier will be studied in a later chapter.

CountDownLatch function list

CountDownLatch(int count) Constructs a CountDownLatch initialized with the given count. // Makes the current thread wait until the latch recounts to zero, unless the thread is interrupted. Void await() // causes the current thread to wait until the latch counts back to zero, unless the thread is interrupted or the specified wait time is exceeded. Boolean await(long timeout, TimeUnit unit) // Decrements the count of the latcher and releases all waiting threads if the count reaches zero. Void countDown() // Returns the current count. Long getCount() // Returns a string that identifies the latch and its status. String toString()Copy the code

 

CountDownLatch data structure

The UML class diagram for CountDownLatch looks like this:

The data structure of CountDownLatch is simple and is implemented through a “shared lock”. It contains the sync object, which is of type sync. Sync is the instance class, which inherits from AQS.

CountDownLatch source analysis (based on JDk1.7.0_40)

1. CountDownLatch(int count)

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
Copy the code

Description: This function creates a Sync object, which is inherited from the AQS class. The Sync constructor looks like this:

Sync(int count) {
    setState(count);
}
Copy the code

SetState () is implemented in AQS, source code is as follows:

protected final void setState(long newState) {
    state = newState;
}
Copy the code

Note: In AQS, state is a private volatile long. For CountDownLatch, state represents the “lock counter.” GetCount () in CountDownLatch is the final call to getState() in AQS, which returns the state object, the lock counter.

2. await()

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
Copy the code

Note: this function is actually called the AQS acquireSharedInterruptibly (1);

The AQS acquireSharedInterruptibly () source is as follows:

public final void acquireSharedInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) throw new  InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }Copy the code

Description: acquireSharedInterruptibly () function is to obtain the Shared lock. If the current thread is interrupted, InterruptedException is thrown. Otherwise, call tryAcquireShared(arg) to try to get the shared lock; Try to success is returned, otherwise, they call doAcquireSharedInterruptibly (). DoAcquireSharedInterruptibly () can make the current thread has been waiting for, until the current thread access to a Shared lock (or interrupted) to return.

TryAcquireShared () is overwritten in countdownlatch.java. Its source code is as follows:

protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1:1; }Copy the code

TryAcquireShared () attempts to obtain a shared lock. If “lock counter =0”, that is, the lock is retrievable, return 1; Otherwise, the lock is unreachable and -1 is returned.

Private void doAcquireSharedInterruptibly (arg) throws InterruptedException {/ / to create "the current thread" Node Node, and the Node is recorded in the lock "Shared lock" type; And adds the node to the end of the CLH queue. final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {// Get the last node. // If the previous node is the head of the CLH queue, "try to acquire the shared lock". final Node p = node.predecessor(); if (p == head) { long r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; }} // (the previous node was not the head of the CLH queue) The current thread waits until the shared lock is acquired. // If a thread has been interrupted while waiting, it will be interrupted again (resuming its previous interrupted state). if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code

Note: (01) addWaiter(node.shared) creates a Node for the current thread where the lock type is “node.shared”. And adds the node to the end of the CLH queue. (02) Node. Predecessor () is used to obtain the last node. If the previous node is the head of the CLH queue, “Try to acquire the shared lock”. (03) shouldParkAfterFailedAcquire () function, as its name, after the failure if trying to get the lock, thread should wait, return true; Otherwise, return false. (4) when shouldParkAfterFailedAcquire () returns true, call the parkAndCheckInterrupt (), the current thread has reached the awaited state, continue to run until get to a Shared lock.

3. countDown()

public void countDown() {
    sync.releaseShared(1);
}
Copy the code

Note: This function actually calls releaseShared(1) to release the shared lock.

ReleaseShared () ¶

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
Copy the code

ReleaseShared () is intended to cause the current thread to release the shared lock it holds. It first attempts to release the shared lock with tryReleaseShared(). If the attempt succeeds, the system returns. If the attempt fails, the shared lock is released via doReleaseShared().

TryReleaseShared () was rewritten in countdownlatch.java as follows:

protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) Int c = getState(); if (c == 0) return false; // "lock counter" -1 int nextc = c-1; // Assign via the CAS function. if (compareAndSetState(c, nextc)) return nextc == 0; }}Copy the code

TryReleaseShared () releases the shared lock, setting the “lock counter” to -1.

Summary: CountDownLatch is implemented through “shared locks”. When CountDownLatch is created, a parameter of type int count is passed. This parameter is the initial state of the lock counter, indicating that the “shared lock” can be obtained at most simultaneously by count threads. When a thread calls the await() method of the CountDownLatch object, the thread waits for the “shared lock” to be available before it can acquire the “shared lock” and continue running. And “shared lock” available condition, is the “lock counter” value is 0! The initial value of the “lock counter” is count, and the “lock counter” -1 is invoked every time a thread calls the countDown() method of the CountDownLatch object; In this way, countDown() must be called by count threads before the “lock counter” reaches zero and the aforementioned waiting thread can continue!

That’s how CountDownLatch works.

An example use of CountDownLatch

This is done by CountDownLatch: The “main thread” waits until all “five child threads” have completed “the specified work (sleep 1000ms)” before continuing.

import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; public class CountDownLatchTest1 { private static int LATCH_SIZE = 5; private static CountDownLatch doneSignal; public static void main(String[] args) { try { doneSignal = new CountDownLatch(LATCH_SIZE); // Create 5 tasks for(int I =0; i<LATCH_SIZE; i++) new InnerThread().start(); System.out.println("main await begin."); Donesignal.await (); // "main thread" waits for five tasks in the thread pool to complete. System.out.println("main await finished."); } catch (InterruptedException e) { e.printStackTrace(); } } static class InnerThread extends Thread{ public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " sleep 1000ms."); // Add one donesignal.countdown () to CountDownLatch. } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code

Running results:

main await begin.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
main await finished.
Copy the code

The result shows that the main thread reduces doneSignal to 0 by waiting for other threads to donesignal.await (). The other five InnerThread threads, each minus one doneSignal by donesignal.countdown (); When doneSignal is 0, main is woken up and resumes execution.