The problem

(1) What is CyclicBarrier?

(2) What are the properties of CyclicBarrier?

(3) The comparison between CyclicBarrier and CountDownLatch?

Introduction to the

CyclicBarrier, which blocks a group of threads until they both reach a condition. It is similar to, but different from, CountDownLatch in that CountDownLatch requires a call to countDown() to trigger an event, while CyclicBarrier does not. It acts like a fence, and only moves down when a group of threads have reached it.

Method of use

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                System.out.println("before");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("after"); }).start(); }}}Copy the code

This method is simple: use a CyclicBarrier to keep three threads in sync when all three threads reach CyclicBarrier. Await (); Everybody runs down again.

Source code analysis

Main inner class

private static class Generation {
    boolean broken = false;
}
Copy the code

Generation controls the use of cyclicbarriers.

For example, if the three threads in the above example complete and go to the next generation, waiting for the three threads to reach the fence before executing together, CountDownLatch cannot do this. CountDownLatch is one-time and cannot be reset.

The main properties

/ / reentrant lock
private final ReentrantLock lock = new ReentrantLock();
// A conditional lock is called trip, which means a thread will trip when it arrives and wake up when it reaches a certain number
private final Condition trip = lock.newCondition();
// The number of threads to wait
private final int parties;
// The command to execute when awakened
private final Runnable barrierCommand;
/ / generation
private Generation generation = new Generation();
// The number of threads that need to wait in the current generation
private int count;
Copy the code

As you can see from the properties, cyclicBarriers are internally conditioned by reentrant locks, so can you imagine this scenario?

Tong Brother imagine this: If the initial count = parties = 3, when the first thread reaches the barrier, count is reduced by 1 and the Condition is added to the queue, so is the second thread that reaches the barrier, and the third thread that reaches the barrier, count is reduced to 0, Call Condition’s signalAll() to notify the other two threads, then queue them to AQS, wait for the current thread to finish, and call Lock.unlock () to wake up one thread from the AQS queue to continue running. This means that the three lines actually arrive at the fence one by one, and then go down.

The above is purely tong Elder brother’s imagination. Is it true? Let’s look ahead.

A constructor

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // Initialize the parties
    this.parties = parties;
    // initialize count to equal parties
    this.count = parties;
    // Initialize all the commands executed at the fence
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}
Copy the code

The constructor needs to pass in a parties variable, which is the number of threads to wait.

Await () method

Each thread that needs to wait at the fence needs to explicitly call await() to wait for another thread to arrive.

public int await(a) throws InterruptedException, BrokenBarrierException {
    try {
        // Call the dowait method without timeout
        return dowait(false.0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen}}private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    / / lock
    lock.lock();
    try {
        / / the current generation
        final Generation g = generation;
        
        / / check
        if (g.broken)
            throw new BrokenBarrierException();

        // Interrupt the check
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        
        // Count is reduced by 1
        int index = --count;
        // If the number is reduced to 0, go to this logic (the last thread goes here)
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // If a command is passed during initialization, execute it here
                final Runnable command = barrierCommand;
                if(command ! =null)
                    command.run();
                ranAction = true;
                // Call the next generation method
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // This loop can only be reached by the non-last thread
        for (;;) {
            try {
                if(! timed)Call the await() method of condition
                    trip.await();
                else if (nanos > 0L)
                    // Timeout wait method
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.Thread.currentThread().interrupt(); }}/ / check
            if (g.broken)
                throw new BrokenBarrierException();

            // It is not the same here
            // The generation reference has changed since nextGeneration() was called when the fence was broken
            if(g ! = generation)return index;
            
            // Timeout check
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw newTimeoutException(); }}}finally{ lock.unlock(); }}private void nextGeneration(a) {
    // Call condition's signalAll() to transfer all waiters in its queue to AQS
    trip.signalAll();
    / / reset the count
    count = parties;
    // Enter the next generation
    generation = new Generation();
}
Copy the code

The entire logic in the dowait() method is divided into two parts:

(1) the last thread steps up to the above logic, when count is reduced to 0, breaks the barrier, it calls nextGeneration() method to notify the waiting thread in the conditional queue to move to AQS queue to be woken up and enter the nextGeneration.

(2) The non-last thread goes through the following for loop logic. These threads will block at the await() method of the condition, they will join the condition queue, wait to be notified, and return when they wake up having updated the “generation”.

The illustration

After studying the previous chapters, this diagram is very easy to read. If you don’t understand it, you should also take a good look at the recommended content

conclusion

(1) CyclicBarrier causes a group of threads to block at await() and wake up (only to move from the conditional queue to the AQS queue) the threads in front of it when the last thread arrives and continue to go down;

(2) CyclicBarrier is not a synchronizer implemented directly using AQS;

(3) CyclicBarrier realizes the synchronization logic based on ReentrantLock and its Condition;

eggs

CyclicBarrier and CountDownLatch?

(1) Both can block a set of threads waiting to be woken up;

(2) The former is automatically woken up when the last thread arrives;

(3) The latter is done by explicitly calling countDown();

(4) The former is realized through reentrant lock and its conditional lock, while the latter is directly realized based on AQS;

(5) The former has the concept of “generation” and can be used repeatedly, while the latter can only be used once;

(6) The former can only realize multiple threads to run together at the fence;

(7) The latter can not only implement multiple threads waiting for one thread condition to be established, but also implement one thread waiting for multiple thread condition to be established (see CountDownLatch chapter use case);

Recommended reading

The beginning of the Java Synchronization series

2, Unbroadening Java magic class parsing

JMM (Java Memory Model)

Volatile parsing of the Java Synchronization series

Synchronized parsing of Java series

6, Deadknock Java synchronization series write a Lock Lock yourself

7. AQS of The Java Synchronization series

ReentrantLock (a) — fair lock, unfair lock

ReentrantLock – Conditional lock

ReentrantLock VS Synchronized Java series

ReentrantReadWriteLock source code parsing

Semaphore source code analysis of Semaphore Java synchronization series

CountDownLatch source code parsing

The AQS finale of the Java Sync series

Java synchronization series StampedLock source code parsing


Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.