1, the introduction of

CyclicBarrier, which blocks a group of threads until they both reach a condition. It is similar to, but different from, CountDownLatch in that CountDownLatch requires a call to countDown() to trigger an event, while CyclicBarrier does not. It acts like a fence, and only moves down when a group of threads have reached it.

  • Working principle diagram:

  • CyclicBarrier and CountDownLatch?

    • Both can block a set of threads waiting to be woken up;
    • The former is automatically woken up when the last thread arrives;
    • The latter is done by explicitly calling countDown();
    • The former is implemented by reentrant lock and its conditional lock, while the latter is directly implemented based on AQS.
    • The former has the concept of “generation” and can be used repeatedly, while the latter can only be used once.
    • The former can only realize multiple threads to run together at the fence;
    • The latter not only allows multiple threads to wait for a thread condition to be established, but also allows one thread to wait for multiple thread conditions to be established (see the CountDownLatch chapter on the use case).

2. Introductory cases

Before analyzing the source code, let’s take a look at an introductory example:

  • Use a CyclicBarrier to keep 5 player threads in sync when all 5 threads arrive at the same timecyclicBarrier.await();At, everybody runs down again together.
/**
 * date: 2021/5/10
 *
 * @author csp
 */
public class CyclicBarrierTest01 {
    /** * Case: * Simulated "King of Glory" game start logic */
    public static void main(String[] args) {
        // Step 1: Define 5 players
        String[] heros = {"Angela"."Arthur"."D"."Zhang fei"."Liu bei"};

        // Create a thread pool with a fixed number of threads
        ExecutorService service = Executors.newFixedThreadPool(5);

        // create a barrier for parties set to 5
        CyclicBarrier barrier = new CyclicBarrier(5);

        // Step 4: Open 5 quests through the for loop, simulate the start of the game, pass each quest (hero name and barrier)
        for (int i = 0; i < 5; i++) {
            service.execute(new Player(heros[i], barrier));
        }

        // When all threads are finished, close the thread pool to release resources
        service.shutdown();
    }

    /** * Player thread: */
    static class Player implements Runnable {
        // Hero name
        private String hero;
        // barrier
        private CyclicBarrier barrier;

        public Player(String hero, CyclicBarrier barrier) {
            this.hero = hero;
            this.barrier = barrier;
        }

        @Override
        public void run(a) {
            try {
                // Each player's loading progress is different, using random numbers to simulate!
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                System.out.println(hero + ": Loading progress 100%, waiting for other players to complete loading...");
                // Only when all 5 player threads are loaded will the fence be released!
                barrier.await();
                System.out.println(hero + ": Found all heroes loaded, let's fight!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch(BrokenBarrierException e) { e.printStackTrace(); }}}}Copy the code

The running results are as follows:

Zhang Fei: Loading progress100%, waiting for other players to finish loading... Arthur: Loading progress100%, waiting for other players to finish loading... Ma Chao: Loading progress100%, waiting for other players to finish loading... Angela: Loading progress100%, waiting for other players to finish loading... Liu Bei: Loading progress100%, waiting for other players to finish loading... Liu2 bei2: find all hero load complete, start battle! Zhang Fei: All heroes have been loaded, let's start the battle! Angela: Seeing all heroes loaded, let's fight! Ma Chao: all heroes have been loaded, let's start the battle! Arthur: All heroes have been loaded. Let's fight!Copy the code

\

3. Source code analysis

Member attribute

// Reentrant locks: Because barrier implementations depend on Condition queues, Condition queues must depend on Locks to be used.
private final ReentrantLock lock = new ReentrantLock();

// A conditional lock is called trip, which means that a thread will trip when it arrives and wake up when it reaches a certain number:
// The thread suspends the condition queue used by the implementation.
// Condition: the thread in the condition queue will wake up only when all the threads in the current generation are in place.
private final Condition trip = lock.newCondition();

// // number of threads to wait: Barrier Number of threads to participate
private final int parties;

// Command executed when wakened: the event that the last thread of the current generation needs to execute
private final Runnable barrierCommand;

// generation: indicates the current generation of the barrier object.
private Generation generation = new Generation();

// The number of threads that need to wait in the current generation:
// Indicates how many threads are not available in the current "generation". The initial value is parties.
private int count;
Copy the code

\

The inner class

Generation is used to control the use of cyclicbarriers.

For example, if the five threads in the above example complete and go to the next generation, waiting for five threads to reach the fence before executing together, CountDownLatch cannot do this. CountDownLatch is one-time and cannot be reset.

/** * indicates "generation" */
private static class Generation {
    // Indicates whether the current "generation" is broken. If the generation is broken, subsequent threads will throw a BrokenException exception
    // And in this generation all pending threads are awakened and a BrokerException is thrown.
    boolean broken = false;
}
Copy the code

\

A constructor

The constructor needs to pass in a parties variable, which is the number of threads to wait.

BarrierAction: the event (which can be null) that needs to be executed by the last thread in the current generation */
public CyclicBarrier(int parties, Runnable barrierAction) {
    // Because a barrier less than or equal to 0 has no meaning... No thread can participate
    if (parties <= 0) throw new IllegalArgumentException();
    // Initialize the parties
    this.parties = parties;
	// initialize count equal to parties, after the current generation of each thread, count--
    this.count = parties;
    // Initialize all the commands executed at the fence
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}
Copy the code

\

Members of the method

1. The nextGeneration method

NextGeneration () is called when all threads of this generation are in place (assuming barrierCommand is not empty and the last thread is required to execute the event). * /
private void nextGeneration(a) {
    // Wake up all threads suspended in the trip condition queue
    trip.signalAll();

    // Reset count to parties
    count = parties;

    // Start a new generation.. Use a new generation object to represent the new generation, which has nothing to do with the previous generation.
    generation = new Generation();
}
Copy the code

2. BreakBarrier method

/** * All threads within the barrier will throw exceptions... * /
private void breakBarrier(a) {
    // Set broken to true, indicating that this generation is broken, and throw an exception directly on the thread of this generation.
    generation.broken = true;
    // Reset count to parties
    count = parties;
    // Wake up all threads suspended in the trip condition queue, which checks if the current generation is broken,
    // If it is broken, the following logic is not the same as that used to start the next generation of awakening.
    trip.signalAll();
}
Copy the code

3. Await () method

  • Each thread that needs to wait at the fence needs to be explicitly calledawait()Method waits for another thread to arrive.
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
     
	// Call the dowait method without timeout
    return dowait(true, unit.toNanos(timeout));
}
Copy the code

4. Dowait Method (emphasis)

  • dowait()The entire logic in the method is divided into two parts:

(1) the last thread steps up to the above logic, when count is reduced to 0, breaks the barrier, it calls nextGeneration() method to notify the waiting thread in the conditional queue to move to AQS queue to be woken up and enter the nextGeneration.

(2) The non-last thread goes through the following for loop logic. These threads will block at the await() method of the condition, they will join the condition queue, wait to be notified, and return when they wake up having updated the “generation”.

/** * timed: indicates whether the thread currently calling the await method has specified the timeout duration. If true, the thread is the * nanos of the response timeout. If timed == false ===> nanos == 0 */
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    // Obtain the barrier global lock object
    final ReentrantLock lock = this.lock;
    / / lock
    // Why do you want to lock it?
    // Because barrier's suspend and wake up components are both condition.
    lock.lock();
    try {
        // Current generation: gets the current generation of the barrier
        final Generation g = generation;

        // Check: If the current generation is already Broken, the thread currently calling the await method throws a Broken exception
        if (g.broken)
            throw new BrokenBarrierException();

        // Interrupt check: if the interrupt flag bit of the current thread is true, the current generation is broken, and the current thread throws an interrupt exception
        if (Thread.interrupted()) {
            // 1. Set the current generation to the broken state
            // 2. Wake up the thread in the trip condition queue
            breakBarrier();
            throw new InterruptedException();
        }

        // The current thread is broken (false). The current thread is broken (false).
        // Normal logic...
		
        // Count is reduced by 1
        // if the parties give 5, then the corresponding index is 4,3,2,1,0
        int index = --count;
        // If the number is reduced to 0, use this logic (the last thread goes here) :
        // If the current thread is the last one to reach the barrier, what do we need to do?
        if (index == 0) {  // tripped
            // flag: true indicates that no exception was thrown when the last thread executed CMD. False: the last thread to execute CMD threw an exception.
            // CMD is the second Runnable interface implementation specified when creating the Barrier object, which can be null
            boolean ranAction = false;
            try {
				// If a command is passed during initialization, execute it here
                final Runnable command = barrierCommand;
                // The barrier object is created with a Runnable interface, which is required by the last thread to arrive
                if(command ! =null)
                    command.run();

                // command-run () does not throw an exception, then the thread will execute here.
                ranAction = true;

                // Call the next generation method: start a new generation
                // 1. Wake up the pending thread in the trip condition queue. The awakened thread gets the lock and exits the await method in turn.
                // 2. Reset count to parties
                // 3. Create a new generation object, representing the new generation
                nextGeneration();
                // Returns 0, since the current thread is the last thread to arrive in this generation, so Index == 0
                return 0;
            } finally {
                if(! ranAction)// If command-run () throws an exception, it will enter here.breakBarrier(); }}// The current thread is not the last thread to reach the Barrier.. You have to go into a spin.

		// This loop can only be reached by the non-last thread
        // Spin until the condition is met, the current generation is broken, the thread is interrupted, and the wait times out
        for (;;) {
            try {
                // The current thread does not specify a timeout period
                if(! timed)// Call the await() method of condition:
                    // The current thread releases the lock, enters the end of the TRIP condition queue, and suspends itself, waiting to be awakened.
                    trip.await();
                else if (nanos > 0L)
					// Timeout wait method:
                    // the timeout is specified when the current thread calls the await method!
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // Throw an interrupt exception that will come in here.
                // When InterruptedException is thrown?
                // Node will throw an interrupt exception when it receives an interrupt signal while in the conditional queue!


                // if g == generation, the current generation does not change.
                // Condition 2:! G.broken If the current generation is not broken, then the current thread breaks and throws an exception.
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // How many times can I execute else?
                    // 1. The generation has changed, so there is no need to throw an interrupt exception, because the generation has been updated. Just set the interrupt flag.
                    // 2. The generation has not changed, but is broken. Instead of returning an interrupt exception, a brokenBarrier exception is thrown. Interrupt flag bits are also recorded.Thread.currentThread().interrupt(); }}// After wake up, execute here, how many cases?
            // 1. Normally, the current barrier opens a new generation (trip.signalAll())
            // 2. When the current Generation is broken, all threads suspended on Trip will be woken up
            // 3. The current thread trip waits for a timeout, then actively moves to the blocking queue and gets a lock wake.

            / / check:
            // Condition true: the current generation has been broken
            if (g.broken)
                // The thread wakes up and then throws a BrokenBarrier exception.
                throw new BrokenBarrierException();

            // After wake up, execute here, how many cases?
            // 1. Normally, the current barrier opens a new generation (trip.signalAll()).
            // 3. The current thread trip waits for a timeout, then actively moves to the blocking queue and gets a lock wake.
			// It is not the same here
			// The generation reference has changed since nextGeneration() was called when the fence was broken
           
            // Condition true: Indicates that during the current thread suspension, the last thread is in place, which triggers a new generation of logic that wakes up the thread in the TRIP condition queue.
            if(g ! = generation)// Returns the current thread's index.
                return index;

            // After wake up, execute here, how many cases?
            // 3. The current thread trip waits for a timeout, then actively moves to the blocking queue and gets a lock wake.
            // Timeout check
            if (timed && nanos <= 0L) {
                / / break the barrier
                breakBarrier();
                // Throw a timeout exception.
                throw newTimeoutException(); }}}finally{ lock.unlock(); }}Copy the code

\

4, summarize

  • CyclicBarrier causes a group of threads to block at await() and wake up (only to move from the conditional queue to the AQS queue) the preceding threads when the last one arrives and continue down the queue;
  • CyclicBarrier is not a synchronizer implemented directly using AQS;
  • CyclicBarrier realizes the synchronization logic based on ReentrantLock and its Condition.