Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.

The last article introduced the principles and usage scenarios of the CountDownLatch utility class (source code analysis and usage scenarios for the concurrent utility class CountDownLatch). Today, WE will introduce CyclicBarrier, another popular concurrent utility class in the JUC package, which translates to a reusable barrier.

Introduction to the

  • CyclicBarrier functions very similarly to CountDownLatch, as wellControls the order in which threads are executedCyclicBarrier, however, differs from CountDownLatch in that a CyclicBarrier blocks a group of threads at the same barrier (synchronization point) until the last thread reaches the barrier (that is, the barrier’s counter drops to zero), and the threads blocking at the barrier continue to execute. CountDownLatch allows one or more threads to wait for the other threads to complete before the current thread can continue. Another difference, implied by the class name, is that once a CyclicBarrier’s counter is reduced to zero, it can be reset so that it can be used againCyclic(Loop) you can see that. When the CountDownLatch’s counter is reduced to 0, it is not reset and therefore cannot be reused.

The sample

  • CyclicBarrier is also very simple to use. You just need to new a CyclicBarrier to create an instance object, which needs to be passed in its constructorThe int typeIs used to specifyBarrier size(that is, when the number of threads reached the barrier, the barrier is opened), and then called in the threadawait()Method to block the thread at the barrier.
  • In the following Demo example, 10 sprinters are simulated with 10 threads. In real life, in 100-meter race, athletes need to hear the starting gun to start, all athletes start at the same time, not false start. The starting gun is the CyclicBarrier of the program, and when everyone is ready and hears the starting gun sound (reaching the barrier), the race starts.
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        Random random = new Random();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        List<Thread> threads = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(()->{
                int time = random.nextInt(5) + 1;
                try {
                    // Use thread sleep to simulate each athlete's preparation time
                    Thread.sleep(time * 1000);
                    System.out.println(Thread.currentThread().getName() + "Ready to go");
                    // When the athlete is ready, he signals the starter that he is ready, calling the await() method
                    cyclicBarrier.await();
                    System.out.println("The starting gun went off,"+Thread.currentThread().getName() + "Start");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch(BrokenBarrierException e) { e.printStackTrace(); }},"Athlete"+(i+1)));
        }
        
        for(Thread thread : threads) { thread.start(); }}}Copy the code

Realize the principle of

  • CyclicBarrierandCoutDownLatchThe underlying implementation of CountDownLatch is also a bit differentdirectlyThis is done by combining a synchronous component that inherits AQS, CyclicBarrierNot directlyWith the help of AQS synchronization components, but throughCombine the ReentrantLock lock(The underlying implementation of ReentrantLock still uses AQS and, after all, the underlying implementation of CyclicBarrier).
  • Since CyclicBarrier is implemented using ReentrantLock, it has an attribute oflock. A counter is also maintained within CyclicBarrier:count. Because CyclicBarrier can be reused, that is, when the counter is reset after it has been reduced to 0, there is another variable that holds the initial value of countparties. CyclicBarrier has an attribute called Generation, which is a CyclicBarrier inner class generation and is used to implement CyclicBarrierawait(long timeout,TimeUnit unit)The time-out wait functionality of the method (explained later in the source code analysis). When CyclicBarrier is reset, generation is also reset to assign. The CyclicBarrier properties and methods are shown in the table below.
Properties or methods role
ReentrantLock lock It is used to ensure thread safety and prevent thread insecurity when multiple threads modify count at the same time
int count Counter, which reduces count by one when the await() method is called
int parties Record the initial value of the counter
Generation generation This property is also reset when the counter is reset. Set the broken property of generation to true when timeout waits occur.
Condition trip Waiting queue
Runnable barrierCommand CyclicBarrier allows a Runnable task to be executed when the counter is reduced to zero, and then the thread blocking at the barrier is executed
await() Let the thread wait while blocking at the barrier and decrease the counter by 1. Timeout wait is not supported
await(long timeout, TimeUnit unit) Let the thread wait while blocking at the barrier, wait for the maximum timeout unit and decrease the counter by 1
reset() Reset the barrier
  • CyclicBarrier has two parameter constructors, as follows.
// parties specifies the size of the counter
// barrierAction is a Runnable, and when the counter is reduced to 0, the barrierAction is executed before the barrier is opened
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

// parties specifies the size of the counter
public CyclicBarrier(int parties) {
    // Call the parameterized constructor with two arguments
    this(parties, null);
}
Copy the code
  • When you performCyclicBarrier cyclicBarrier = new CyclicBarrier(10);This line of code initializes the value of the counter count and parties. The passed argument, 10, indicates that the barrier will not be opened until 10 threads have reached it.
  • CyclicBarrier. Await () is called directly in the await() method when it is calleddowait()Methods. The source code for the dowait() method is shown below.
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        // If a thread calls await(long timeout,TimeUnit unit) and waits out, then g.broken is true and therefore raises an exception
        if (g.broken)
            throw new BrokenBarrierException();

        // If the thread is interrupted, interrupt the barrier directly (wake up all waiting threads)
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // The counter decrement
        int index = --count;
        // If the decrement result is 0, all threads have reached the barrier
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // Determine if there are any tasks that need to be prioritized and execute them
                final Runnable command = barrierCommand;
                if(command ! =null)
                    command.run();
                ranAction = true;
                NextGeneration () wakes up all threads in the wait queue while the count value of the counter is reset
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // If the counter does not drop to 0, the current thread is queued to wait
        for (;;) {
            try {
                // Timed indicates whether a timed wait is occurred
                if(! timed)Call the await() method of condition to enter the wait queue
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.Thread.currentThread().interrupt(); }}if (g.broken)
                throw new BrokenBarrierException();

            if(g ! = generation)return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw newTimeoutException(); }}}finally{ lock.unlock(); }}Copy the code
  • In dowait(), generation.broken is checked to see if it is true. It must be false when first entered (the default is false). This field is only valid if the thread calls await(long timeout,TimeUint unit). True only when a timeout occurs.
  • If count is 0, all threads have reached the barrier, and the barrier can be opened to allow the blocking thread to continue executing. However, before opening the barrier, it determines whether barrierCommand is empty, and if not, barrierCommand is executed first. The nextGeneration() method is then called. The main role of nextGeneration() isWake up all threads in the wait queue and reset the counter. The source code is as follows.
private void nextGeneration(a) {
    // signal completion of last generation
    // Wake up all waiting threads in the wait queue
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}
Copy the code
  • If count minus one is not zero, there are threadsWe can't open the barrier until we reach itTherefore, it is necessary to make the current thread join the wait queue, that is, calltrip.await(), let the thread wait.
  • If a timeout occurs, it is executed in the for loopThe catch blockIn the catch block, the breakBarrier() method is calledSet generation's broken property to true. So when you get toif(g.broken)It will decide that it is true and throw an exception, thus implementing the timeout wait function.
private void breakBarrier(a) {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}
Copy the code
  • If no timeout wait occurs, when the counter drops to zero, all threads in the TRIP wait queue are woken up and put into the Lock synchronization queue, then node by node threads are woken up in the Lock synchronization queue, and then when threads from thetrip.await()Method to wake up and continue with the following logic. A detailed analysis of ReentrantLock can be found in these two articles:ReentrantLock source code analysis and Fair locks versus unfair locks
  • The implementation of the await(long timeout,TimeUnit Unit) method of Cyclicbarrier also ends up calling the dowait() method, so I won’t go into details here. Overall, CyclicBarrier’s source code implementation is relatively simple.

The difference with CountDownLatch

  • CyclicBarrier differs from CountDownLacth in several ways. CyclicBarrier allows all threads to reach the barrier before executing the logic together, while CountDownLatch allows a thread or group of threads to wait for the other threads to finish executing before executing again. Second, the CyclicBarrier’s counter can be reset and therefore reused, whereas the CountDownLatch’s counter cannot be reset and reused. Third, a CyclicBarrier can perform a Runnable task before opening a barrier after all threads have reached it, which is useful in special scenarios. Fourth, although both underlying implementations are ultimately based on AQS, CyclicBarrier is implemented indirectly using AQS through ReentrantLock and CountDownLatch is implemented directly using AQS shared locks.
  • The CyclicBarrier constructor supports passing in a parameter of type Runnable, which is shown in the Demo at the beginning of this article. In the Demo above, the runners are ready to stand at the starting line, and the starter fires the gun first, and then the runners start to run, that is, the thread starts to execute, so the firing action is before the barrier is opened, so we use Runnable to achieve this. The sample code is as follows.
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        Random random = new Random();
        // In the CyclicBarrier constructor, the second argument is passed a Runnable.
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10.new Runnable() {
            @Override
            public void run(a) {
                System.out.println("============== in position!! Get ready!! Bang! = = = = = = = = = = = ="); }}); List<Thread> threads =new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(()->{
                int time = random.nextInt(5) + 1;
                try {
                    Thread.sleep(time * 1000);
                    System.out.println(Thread.currentThread().getName() + "Ready to go");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "Start");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch(BrokenBarrierException e) { e.printStackTrace(); }},"Athlete"+(i+1)));
        }

        for(Thread thread : threads) { thread.start(); }}}Copy the code
  • As can be seen from the print result, when all the athletes are ready, it will be printed first============== in position!! Get ready!! Bang! = = = = = = = = = = = =After this line, other threads are allowed to continue.

conclusion

  • This paper introduces in detail the functions of CyclicBarrier and how to use it, then analyzes the implementation principle of CyclicBarrier with the combination of source code, and compares the differences between CyclicBarrier and CountDownLatch in terms of function and underlying implementation principle. Finally, it summarizes. In most scenarios, everything CountDownLatch can do can be done using CyclicBarrier.

recommended

  • The implementation principle of ReadWriteLock
  • Semaphore source code analysis and use scenarios
  • Concurrency tool class CountDownLatch source analysis and usage scenarios
  • Design principle of queue synchronizer (AQS)
  • Queue synchronizer (AQS) source code analysis