CyclicBarrier, a synchronization helper class, is described in the API as follows:

It allows a set of threads to wait for each other until a common barrier point is reached. Cyclicbarriers are useful in programs that involve a set of threads of a fixed size that must wait for each other from time to time. Because this barrier can be reused once the waiting thread is freed, it is called a cyclic barrier.

In plain English, let a group of threads block when they reach a barrier. When the last thread reaches the barrier, the barrier opens and all threads blocked by the barrier continue to work.

Implementation analysis

CyclicBarrier is structured as follows:

  • CyclicBarrier(int parties) : Creates a new CyclicBarrier that starts when a given number of participants (threads) are in a waiting state, but it does not perform predefined actions when the barrier is started.
  • CyclicBarrier(int parties, Runnable barrierAction) : Create a new CyclicBarrier that starts when a given number of participants (threads) are in the wait state and performs the given barrier action when the barrier is started, which is performed by the last thread to enter the barrier.

Parties indicates the number of threads to intercept.

BarrierAction is the Runnable command received by cyclicbarriers and is used to preferentially execute barrierAction when a thread reaches a barrier for more complex business scenarios.

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}
Copy the code

The most important method in a CyclicBarrier is the await() method, which will wait until all participants have called the await method on the barrier. As follows:

public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) {throw new Error(toe); // cannot happen } }Copy the code

Call dowait(Boolean timed, long nanos) inside await() method:

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {// Final ReentrantLock lock = this.lock; lock.lock(); Try {// Final Generation g = Generation; // Current generation "corrupted", CyclicBarrie if (g.broken) CyclicBarrie if (g.broken) // When a thread attempts to wait for a barrier in the broken state, Throw new BrokenBarrierException() when the barrier is broken and the thread is waiting; CyclicBarrier if (thread.interrupted ()) {breakBarrier(); CyclicBarrier if (thread.interrupted ()) {breakBarrier(); throw new InterruptedException(); } count - 1 int index = --count; If (index == 0) {// tripped Boolean ranAction = false; try { final Runnable command = barrierCommand; // Trigger task if (command! = null) command.run(); ranAction = true; // Wake up all waiting threads and update generation nextGeneration(); return 0; } finally { if (! ranAction) breakBarrier(); } } for (;;) {try {// If it is not timeout waiting, the condition.await () method is called to wait if (! timed) trip.await(); Else if (Condition. AwaitNanos () = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); // Generation has been updated to return index if (g! = generation) return index; Timed && nanos <= 0L) {breakBarrier(); // timed && nanos <= 0L; throw new TimeoutException(); }}} finally {// unlock lock.unlock(); }}Copy the code

The processing logic of await() is relatively simple: if the thread is not the last one to arrive, it will wait until the following happens:

  1. The last thread arrives, index == 0
  2. The specified time is exceeded (timeout wait)
  3. Some other thread interrupts the current thread
  4. Another thread interrupts another waiting thread
  5. Another thread is waiting for the barrier to time out
  6. Some other thread calls reset() on this barrier. The reset() method is used to reset the barrier to its initial state.

In the above source code, we may need to pay attention to the Generation object. In the above code, we always see a BrokenBarrierException thrown. When is the exception thrown? BrokenBarrierException is thrown if another thread calls reset() while a thread is waiting, or if the barrier is already broken. Meanwhile, if any thread is interrupted while waiting, all other threads throw a BrokenBarrierException and place the Barrier in a broken state.

At the same time, Generation describes the more significant Generation of CyclicBarrier. In a CyclicBarrier, the same set of threads belong to the same generation. When parties threads reach the barrier, the generation is updated. Broken indicates whether the current CyclicBarrier has been interrupted.

private static class Generation {
    boolean broken = false;
}
Copy the code

By default, barriers are unbroken.

When a barrier is broken or a thread breaks, all threads are terminated by breaking the barrier () :

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}
Copy the code

In addition to setting broken to true in breakBarrier(), signalAll is called to wake up all threads waiting on CyclicBarrier.

When all threads have reached the barrier (index == 0), an update swap is performed via nextGeneration(), which does three things: wake up all threads, reset count, and generation.

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}
Copy the code

CyclicBarrier also provides await(long timeout, TimeUnit unit) methods to control timeout, internally by calling doawait().

Application scenarios

CyclicBarrier attempts to merge with multithreaded results. It is used in scenarios where multithreaded data is computed and the results are merged. Let’s say we need to count multiple Excel numbers and wait for a total result. We can process each Excel through multi-threading and get corresponding results after completion of execution. Finally, barrierAction is used to calculate the calculation results of these threads and get the sum of all Excel.

The sample application

For example, we won’t have a meeting until everyone is here.

public class CyclicBarrierTest { private static CyclicBarrier cyclicBarrier; static class CyclicBarrierThread extends Thread{ public void run() { System.out.println(Thread.currentThread().getName() + "here "); // wait for try {cyclicBarrier. Await (); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args){ cyclicBarrier = new CyclicBarrier(5, New Runnable() {@override public void run() {system.out.println ("..."); ); }}); for(int i = 0 ; i < 5 ; i++){ new CyclicBarrierThread().start(); }}}Copy the code

Running results: