The introduction

I always thought that the program is a logical description for the real world, and in the real world a lot of things will need to complete, coordination cooperation is to complete a delivery platform can’t just rely on one person, but need to research, development, testing, product and project managers for different roles work together to complete the final delivery. How is this coordination described in the world of programs? Doug Lea’s code description of how to coordinate tasks with CountDownLatch and CyclicBarrier is used in the process of packaging.

CountDownLatch

I’m sure you all know that one of the most important features of good code is the ability to name classes, variables, etc., exactly as the name suggests. That is to say, you can get a sense of what the business semantics of a class or variable are. Take CountDownLatch for example, its naming image represents its power attribute. Count stands for counting, Down stands for decreasing the counter, and Latch stands for the resulting action after decreasing the counter. The word “CountDownLatch” in combination with the word “CountDownLatch” means the opening of the latch after the counter has been reduced.

The name of the class indicates that it controls threads by decrement the counter.

/** * A synchronization aid that allows one or more threads to wait until * a set of operations being performed in other  threads completes. * * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@linkCyclicBarrier}. *... * /
Copy the code

The general meaning of the comments above is that CountDownLatch is a thread synchronizer that allows one or more threads to block and wait until business execution is completed in another thread. CountDownLatch can be initialized with a counter that blocks the waiting thread until the counter is set to 0. When the counter is set to 0, the blocked thread is released. In addition, it is a disposable synchronizer and the counter cannot be reset.

Three key features of CountDownLatch can be identified from the JDK’s official description:

1. It is a thread synchronizer used to coordinate the timing of thread execution.

2. It is essentially a counter, a command gun for controlling threads;

3. It is disposable and will become invalid after use.

Now that we know what CountDownLatch is, let’s take a look at how it can be used and when it can be used to solve some problems in our code.

Usage scenarios

As described above, CountDownLatch is like the starting gun fired by the referee at a track event. When all the competitors are ready, the starting gun is fired and all the competitors move. So in a Java multithreaded scenario, CountDownLatch is the thread coordinator, and its counter has not been reduced to zero before. In an alarm monitoring platform, you need to query the alarm information from the alarm service and the work order information from the work order service, and then analyze which alarms are not transferred to the work order. As with the old system, see the simplified pseudocode below:

List<Alarm> alarmList = alarmService.getAlarm();
List<WorkOrder> workOrderList = workOrderService.getWorkOrder();
List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
Copy the code

Can you see anything about this pseudo-code that needs to be optimized? So let’s analyze it. This code may have no impact when the data volume is small. However, when the data volume of alarms and work orders is large, data query may be slow when obtaining alarm information or work order information, which leads to performance bottlenecks in the analysis task. So how do we optimize? As can be seen from the business and code, there is no coupling between the acquisition of alarm information and the acquisition of work order information. In the above code, they are executed sequentially. Therefore, for performance optimization, they can be considered to be executed in parallel.

Then the modified and optimized pseudocode is shown as follows:

Executor executor = Executors.newFixedThreadPool(2);
executor.execute(()-> { alarmList = alarmService.getAlarm(); });
executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); });
 
List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
Copy the code

By using the thread pool, we can obtain the alarm information and work order information concurrently, instead of obtaining the work order information after obtaining the alarm information, which is more efficient. There are problems with this approach, however, because an operation is executed in an online thread without knowing the actual execution result, which makes it difficult to determine when to perform data analysis. This is where the CountDownLatch comes in. It can be used to wait for thread picking and then release the subsequent logic when the condition is satisfied. This is just like the company organizing group construction and arranging to meet at the gate of the company at 8:30 in the morning, so the driver will certainly not start the bus until all the participants arrive at the same time.

The pseudocode after using CountDownLatch looks like this:

Executor executor = Executors.newFixedThreadPool(2);
CountDownLatch latch = new CountDownLatch(2);
executor.execute(()-> { alarmList = alarmService.getAlarm();
                      latch.countDown();
                      });
executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); 
                      latch.countDown();
                      });
latch.await();
List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
Copy the code

Underlying Implementation Principles

Initialize the

Before using CountDownLatch, we had to initialize the latch. During the initialization, we actually did two things. One was to create a synchronization queue for AQS, and the other was to set the state in the AQS to count. This state is the core variable of AQS (AQS is the underlying implementation basis for packet dispatching, and its analysis will be carried out in the next article).

Sync inherits AQS, overwrites the AQS lock unlocking method, and blocks the thread by calling the AQS method on the Sync object. The internal Sync class code is shown below, where the tryAcquireShared method rewrites the AQS template method and is mainly used to obtain the shared lock. Inside CountDownLatch, the lock is obtained by checking whether the state value obtained is 0. If the acquired state is 0, the lock is successfully acquired and the thread will not block. Otherwise, the lock fails to be acquired and the thread will block.

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
 
        Sync(int count) {
            setState(count);
        }
 
        int getCount(a) {
            return getState();
        }
		// Try to add a shared lock.
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0)?1 : -1;
        }
		// Try to release the shared lock.
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0; }}}Copy the code

Counter decrement

As in the code described in the scenario above, each thread executes a countDown operation after it has completed its own business to indicate that it is ready to complete. Also check that count is 0. If 0, all waiting threads need to be woken up. As shown in the following code, it actually calls the releaseShared method of parent AQS.

public void countDown(a) {
        sync.releaseShared(1);
    }
Copy the code

TryReleaseShared the tryReleaseShared method actually attempts to release the lock if count is decremented to 0, and then releases all threads.

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code

The general code execution logic can be seen below:

Blocking threads

The effect of await is to block the current thread until count is reduced to 0. It actually calls the tryAcquireSharedNanos method of the inner class, which is actually a method in the PARENT AQS of Sync.

public void await(a) throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
Copy the code

AQS provides a way to get a fair lock implementation in response to interrupts. TryAcquireShared () ¶ tryAcquireShared () ¶ tryAcquireShared () ¶ tryAcquireShared () is a method that attempts to acquire a shared lock. If it fails, the thread will be added to the AQS synchronization queue for waiting.

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
Copy the code

CyclicBarrier

Again, the CyclicBarrier literally means Cyclic and the Barrier literally means a CyclicBarrier. Before doing CyclicBarrier, let’s take a look at how the JDK describes it.

/**
 * A synchronization aid that allows a set of threads to all wait for
 * each other to reach a common barrier point.  CyclicBarriers are
 * useful in programs involving a fixed sized party of threads that
 * must occasionally wait for each other. The barrier is called
 * <em>cyclic</em> because it can be re-used after the waiting threads
 * are released.
 *
 * <p>A {@code CyclicBarrier} supports an optional {@linkRunnable} command * that is run once per barrier point, after the last thread in the party * arrives, but before any threads are released. * This <em>barrier action</em> is useful * for updating shared-state before any of the parties continue. *... * * /
Copy the code

As you can see from the JDK’s description, CyclicBarrier is also a thread synchronization coordinator that coordinates the execution of a group of processes. When a specified number of threads reach the fence, the fence can be released, ending the thread blocking state. This makes it look similar to CountDownLatch, but there are differences. The CyclicBarrier is recyclable, while the CountDownLatch is disposable. Let’s take a look at the core properties of CyclicBarrier.

// The lock on the fence entrance
private final ReentrantLock lock = new ReentrantLock();
// Thread wait conditions
private final Condition trip = lock.newCondition();
// The number of threads intercepted
private final int parties;
// The task to be executed before the next fence algebra arrives
private final Runnable barrierCommand;
// The current fence algebra
private Generation generation = new Generation();
Copy the code

The source code for CyclicBarrier is similar to CountDownLatch, which is based on the AQS shared mode, while CyclicBarrier is based on Condition.

CyclicBarrier internally maintains the parties and count variables. Parties represent the number of threads that need to be blocked each time they participate in a Generation, and count is an internal counter that is equal to parties when initialized. The count is decreased by 1 each time the await method is called, similar to countDown above.

Usage scenarios

Take the above business scenario as an example to analyze again. Above, we realized the thread coordination problem of querying alarm information and work order information through CountDownLatch, but a new problem appeared again. Since both alarm information and work order information are generated in real time, the implementation of CountDownLatch can only accomplish thread coordination once. If the subsequent alarm information and work order information need to be queried for data analysis, it will not be able to help. That is, if there is a need for continuous threads to wait for each other to complete before performing subsequent business operations, CyclicBarrier is needed.

Underlying Implementation Principles

Initialize the

CyclicBarrier has two constructors. One is to specify the number of threads to be coordinated each time and the subsequent task to be executed after unblocking CyclicBarrier, and the other is to set the number of threads to be coordinated without setting the subsequent task.

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
 
 public CyclicBarrier(int parties) {
        this(parties, null);
    }
Copy the code

Block waiting for

For CyclicBarrier, the most core implementation of the wait method is the dowait method, and the specific code is as follows:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
 
            if (g.broken)
                throw new BrokenBarrierException();
 
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
 
            int index = --count;
            // If count counts to 0, all threads need to be woken up to proceed to the next phase of thread coordination
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if(command ! =null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
 
            // The counter is not 0, continue the loop
            for (;;) {
                try {
                    if(! timed) trip.await();else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.Thread.currentThread().interrupt(); }}if (g.broken)
                    throw new BrokenBarrierException();
 
                if(g ! = generation)return index;
 
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw newTimeoutException(); }}}finally{ lock.unlock(); }}Copy the code

The dowait method decrements count to 0. If the task is defined at initialization, the task is executed when count is 0. After the task completes, nextGeneration is called for the next thread coordination cycle, waking up all threads and resetting counters.

conclusion

This article introduces the thread synchronization coordinator CountDownLatch and CyclicBarrier from the perspective of usage scenario and underlying implementation respectively. Although both of them can play the role of thread coordination, they are actually different. CountDownLatch is better for synchronous coordination between one thread and multiple other threads, while CyclicBarrier is better for a group of threads waiting on each other. In addition, the CountDownLatch is a disposable product, while the CyclicBarrier counter is reusable and can be reset automatically.