What is a CyclicBarrier?

Cyclicbarriers, commonly referred to as barriers, are multithreaded synchronization tools.

It is often compared with CountDownLatch because both are tools for waiting on a group of threads.

Unlike CountDownLatch, which is used to coordinate threads waiting for a group of worker threads to do a notification and resume execution, CyclicBarrier, as the name suggests, is a group of threads waiting for each other to arrive at a specified location before they all resume execution.

CyclicBarrier is further advanced by:

  • Allows reuse and can be reused after execution or manual reset; CountDownLatch is disabled.

  • Allows callback logic to be performed, typically automatically invoked by the last thread to reach the fence. We can use this feature to perform the finishing touches in our business logic.

  • Problems can break the current generation (notifying other threads) and reset the next time. Unlike CountDownLatch, which doesn’t care what happens when it reaches a specific condition, a CyclicBarrier must wait on the fence. If an exception occurs before a worker thread arrives, a human processing reset is required.

    Of course, exceptions and timeouts will automatically destroy the current use, but note that the next use cannot be directly carried out, you must manually reset.

How to use CyclicBarrier?

CountDownLatch: A number of locks on a safe (juejin. Cn) cue to CyclicBarrier said to open the lock, no longer available.

Allow reuse

Of course, the henchman hopes that after he picks up the lock and takes the money and runs away, the boss will come back and check and not find out.

private static void normal(a) throws InterruptedException {
    CyclicBarrier barrier = new CyclicBarrier(3);
    if(! barrier.isBroken()) { System.out.println("Safe deposit Box: Secured.");
    }
    System.out.println("Cronies: Success or death!!!!!");
    for (int i = 0; i < 3; i++) {
        Thread thread = new Thread(new MyFollower(barrier));
        thread.start();
    }
    Thread.sleep(1000);
    if(! barrier.isBroken()) { System.out.println("Safe deposit Box: Secured.");
        System.out.println("Boss: Good, this is good."); }}private static class MyFollower implements Runnable {
    private CyclicBarrier barrier;

    public MyFollower(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run(a) {
        System.out.println("Confidant: Enter the password ing");
        try {
            barrier.await();
        } catch (InterruptedException e) {
            System.out.println("On his deathbed: Swearing out of the game.");
        } catch (BrokenBarrierException e) {
            System.out.println("OS: Stupid, this is all typed wrong.");
        }
        System.out.println("Cronies: become!!!!!"); }} safe: security protection in the cronies: not successful and become kernel!!!!! Dear friend: Input password ING dear friend: input password ING dear friend: input password ING dear friend: become!!!!! Confidant: become!!!!! Confidant: become!!!!! Safe: Under security Boss: Good, this is a good oneCopy the code
Dear friend is killed (interrupt), input wrong (active destruction), input slow (timeout), close my security lock what matter?

Since internal thread interrupts, or active sabotage verifies a Boolean, there is no way for other threads to know what caused the sabotage in this case.

Simple implementation code (the real world certainly needs better coordination and handling)

private static void normal(a) throws InterruptedException {
    CyclicBarrier barrier = new CyclicBarrier(3);
    if(! barrier.isBroken()) { System.out.println("Safe deposit Box: Secured.");
    }
    System.out.println("Cronies: Success or death!!!!!");
    for (int i = 0; i < 3; i++) {
        Thread thread = new Thread(new MyFollower(barrier, i + 1));
        thread.start();
        if (i == 0) {
            thread.interrupt();
        }
    }
    Thread.sleep(2000);
    if (barrier.isBroken()) {
        System.out.println("Cronies: A Reset.");
        barrier.reset();
    }

    if(! barrier.isBroken()) { System.out.println("Safe deposit Box: Secured.");
        System.out.println("Boss: Good, this is good."); }}private static class MyFollower implements Runnable {
    private CyclicBarrier barrier;
    private int no;

    public MyFollower(CyclicBarrier barrier, int no) {
        this.barrier = barrier;
        this.no = no;
    }

    @Override
    public void run(a) {
        System.out.println("Confidant: Enter the password ing");
        if (no == 3) {
            System.out.println("Confidant: Wrong type... Reset it.");
            barrier.reset();
            return;
        }
        if (no == 2) {
            System.out.println("Cronies: Slow to lose...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return;
        }
        try {
            barrier.await(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            System.out.println("On his deathbed: Swearing out of the game.");
            return;
        } catch (BrokenBarrierException e) {
            System.out.println("OS: What fool is wrong?");
            return;
        } catch (TimeoutException e) {
            System.out.println("Cronies: Who is typing slowly?");
            return;
        }
        System.out.println("Cronies: become!!!!!");
    }
Copy the code

Manufacturer custom unlock result (callback function)

Safety lock manufacturers allow consumers to customize the celebration, notification and so on after unlocking. So how do we do that?

private static void normal(a) throws InterruptedException {
    CyclicBarrier barrier = new CyclicBarrier(3, () -> {
        System.out.println("Safe deposit box: Congratulations host, got money again!!");
    });
    if(! barrier.isBroken()) { System.out.println("Safe deposit Box: Secured.");
    }
    System.out.println("Cronies: Success or death!!!!!");
    for (int i = 0; i < 3; i++) {
        Thread thread = new Thread(new MyFollower(barrier));
        thread.start();
    }
    Thread.sleep(1000);
    if(! barrier.isBroken()) { System.out.println("Safe deposit Box: Secured.");
        System.out.println("Boss: Good, this is good."); }} safe: security protection in the cronies: not successful and become kernel!!!!! Dear friend: Input password ING dear friend: input password ING dear friend: input password ING dear friend: become!!!!! Confidant: become!!!!! Confidant: become!!!!! Safe deposit Box: congratulations master, got money again!! Safe: Under security Boss: Good, this is a good oneCopy the code

Note: the callback function does just about anything, depending on how it is used. But this is done by the last worker thread to reach the fence.

CyclicBarrier source

CyclicBarrier is also an AQS system, but instead of extending AQS directly internally like CountDownLatch, it uses a ReentrantLock that also extends AQS internally.

About AQS and ReenrtantLock can be viewed:

  • The cornerstone of AbstractQueuedSynchronizer (AQS) : concurrent tool (juejin. Cn)
  • Reentrantlock source code parsing (juejin. Cn)

Based on this, we can speculate the internal structure of CyclicBarrier:

  • Locks are required in order to ensure the concurrency of multiple threads
  • Be conditional. Wait at the fence for the group
  • Have a counter that counts if all arrived
  • There should be a number of participants to reset the counter
  • There must be a marked break state
  • There should be generational measures to ensure that the reset counter and state does not affect the last one.
  • To save the callback function.

Properties and inner classes

private static class Generation {
    Generation() {}                 // prevent access constructor creation
    // indicate that the fence is broken and the thread will no longer wait
    // This is not the case.
    // 1. Wait timeout
    // 2. Thread interrupt
    // 3. Manually reset
    // 4. The callback execution is abnormal
    boolean broken;                 // initially false
}
// Concurrency control for state and quantity operations
private final ReentrantLock lock = new ReentrantLock();
// The thread waits for the condition variable at the fence
private final Condition trip = lock.newCondition();
// The number of threads participating
private final int parties;
// The callback action on completion is executed by the last thread to reach the fence
private final Runnable barrierCommand;
// Allow generation under reuse
private Generation generation = new Generation();

// The number of threads waiting for the fence is not reached
// The initial value is parties;
// parts-count = number of threads waiting
private int count;
Copy the code

Comparison conjecture:

  • To have a lockReentrantLock lockIn order to ensure the concurrency of multiple threads
  • Must have the conditionsCondition tripFor the group to wait at the fence
  • Have a countercountAnd calculate whether all arrived
  • There should be a number of participantspartiesTo reset the counter
  • There must be a marked break stategeneration.broken
  • There should be generational measuresgenerationTo ensure that the reset counter does not affect the previous one.
  • To save the callback functionbarrierCommand.

However, Generation Generation processing is relatively simple, will not retain the last Generation of other states, directly reset; Only the broken state is reserved to limit the exit of threads that are still executing in the previous generation.

So the reset and notification methods are as follows:

Note: To avoid confusion, the general internal invocation of these few methods is always required only for locks

Into the next generationnextGeneration

A successful entry into the next generation means that the fence has been reached and the callback has been executed successfully.

// Go to the next generation
private void nextGeneration(a) {
    // signal completion of last generation
    // Wake up the waiting thread in the previous generation
    trip.signalAll();
    // Reset the count count
    count = parties;
    generation = new Generation();
}
Copy the code

Mark damage

The common cases are interrupts, timeouts, execution failures, and so on.

private void breakBarrier(a) {
    generation.broken = true;
    count = parties;
    // Wake up the waiting thread in the current generation
    trip.signalAll();
}
Copy the code

We can see that the two notification threads must be accompanied by a reset of count. The difference is that generation and broken are handled respectively.

Note When the thread waits for the fence, it determines two values according to different conditions.

The constructor

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    // Specify a callback action
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}
Copy the code

Core method

await

public int await(a) throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false.0L);
    } catch (TimeoutException toe) {
        // There is no response timeout, so you need to handle the timeout situation
        throw new Error(toe); // cannot happen}}public int await(long timeout, TimeUnit unit)
    throws InterruptedException, BrokenBarrierException, TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}
Copy the code

As you can see, wait methods are all called doWait inside. From the exception list of the method, it is a large and complete handling method, which can run out of interrupts, breaks, and timeout exceptions.

dowait

/ * * *@paramTimed Specifies whether timeout * is allowed@paramNanos timed = true is only meaningful *@returnIndex of thread to fence: first :parties - 1; The last one :0 */
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 1. The core logic must be locked
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            // 2. If it is damaged, it must be reset manually
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            // 3. Check ahead of time that the current thread of execution has been interrupted, then finish: break the fence and wake up all waiting threads
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // 4. The current thread is the last one
            boolean ranAction = false;
            try {The last thread of execution is responsible for executing the callback
                final Runnable command = barrierCommand;
                if(command ! =null)
                    command.run();
                ranAction = true;
                // All reached, executed successfully: enter the next generation
                nextGeneration();
                return 0;
            } finally {
                if(! ranAction)// Execution failure is also a wrap upbreakBarrier(); }}// loop until tripped, broken, interrupted, or timed out
        // 5. Loop and wait until any interruption, destruction, or timeout occurs
        for (;;) {
            try {
                // Determine whether to wait for timeout or not
                if(! timed) trip.await();else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 6. Determine whether there is an external or internal interruption
                if (g == generation && ! g.broken) {
                    // 6.1 Indicates that an external thread interrupts
                    // Throw an exception while doing the finishing
                    breakBarrier();
                    throw ie;
                } else {
                    // 6.2 If it goes to this point, it must belong to the fence broken, or all reach the fence
                    // Restore the interrupted state directly without handling exceptionsThread.currentThread().interrupt(); }}if (g.broken)
                / / 7. Check if it belongs to the broken wake up
                throw new BrokenBarrierException();

            if(g ! = generation)/ / 8. Whether it is over is equivalent to returning the serial number
                return index;

            // 9
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
            // I also do not clear what situation will continue to loop.
            // If there is a loop, the interrupt state is carried over to step 3 of the next round to check the interrupt, thus throwing an interrupt exception}}finally{ lock.unlock(); }}Copy the code

Notice that the first step is to acquire the lock, because any counter operation, nextGeneration, breakBarrier, or blocking is guaranteed by the lock

In step 6, the wait may be interrupted, depending on whether it is external or internally induced: nextGeneration, breakBarrier

6.1 If it is an external interrupt, break the fence and throw an exception.

6.2 If it is an internal interruption, the system only recovers the interruption status and goes to the next step. Because interrupts are typically at the end of a wait, no exception is thrown.

6.2 Subsequent judgment of timeout, destruction, and next generation (end or reset)

reset

Resetting breaks the wait for the current generation, so breakBarrier is called.

Enter nextGeneration: nextGeneration.

If resetting is only for the next generation, you can call the latter directly; But you have to interrupt the current generation.

So breakBarrier and nextGeneration combine two uniquely different operations:

  • generation.broken = true;
  • generation = new Generation();
public void reset(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // Wake up the waiting thread, the flag is broken
        breakBarrier();   // break the current generation
        // Enter the next generation
        nextGeneration(); // start a new generation
    } finally{ lock.unlock(); }}Copy the code

Statistical methods

The main purpose is to determine whether to destroy and obtain the number of waiting threads.

Combined with getParties, you can see how many threads are left unarrived.

public boolean isBroken(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally{ lock.unlock(); }}public int getNumberWaiting(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally{ lock.unlock(); }}public int getParties(a) {
    return parties;
}
Copy the code

Pay attention to the callawaitMethod with the number of threads initially setpartiesDon’t agree

  1. Less thanpartiesFail to makecountReducing to 0 will eventually result in all executionsawaitAll threads are blocked.
  2. More thanparties, just rightpartiesWhen all arrived, they entered the next generation. In the next generation the number of threads is more thanpartiesCause this part of the thread to block.

It is recommended that the number of threads be equal to parties and try to call await methods with the timeout specified.

conclusion

The examples of CountDownLatch and CyclicBarrier aren’t very good, but they barely make sense.

To sum up, the three main culprits of CyclicBarrier’s dead CountDownLatch are:

  • Allows reuse and can be reused after execution or manual reset; CountDownLatch is disabled.

  • Allows callback logic to be performed, typically automatically invoked by the last thread to reach the fence. We can use this feature to perform the finishing touches in our business logic.

  • Problems can break the current generation (notifying other threads) and reset the next time. Unlike CountDownLatch, which doesn’t care what happens when it reaches a specific condition, a CyclicBarrier must wait on the fence. If an exception occurs before a worker thread arrives, a human processing reset is required.

    Of course, exceptions and timeouts will automatically destroy the current use, but note that the next use cannot be directly carried out, you must manually reset.