CyclicBarrier is also called a “barrier”, and as the name suggests, it can be recycled. CyclicBarrier is used when a group of threads all reach a state, and then all execute at the same time.

CyclicBarrier. Await () is called when N threads are created and started to test concurrent requests to the interface. CyclicBarrier. Await () is called.

CyclicBarrier is not directly dependent on AQS; it is implemented based on ReentrantLock’s Condition. Internally, it maintains a variable, count, to record the number of remaining threads waiting to arrive. As long as the count is greater than zero, the thread calling await() is added to the Condition queue and the Park is suspended. CyclicBarrier calls condition.signalAll() to wake up all waiting threads when all threads have arrived and count equals zero.

Introduction to the source code

CyclicBarrier code is not much, and the main logic is in ReentrantLock and AQS. It is highly recommended to read the source code of ReentrantLock and AQS first: ReentrantLock Source Guide, AQS Source Guide.

attribute

CyclicBarrier internally uses ReentrantLock to ensure synchronization and Condition to block and wake up threads. Parties is used to record the number of threads needed to open the barrier. After the barrier is opened, it will be reset using Parties. The barrierCommand is used to save the priority task when the barrier is enabled. It can be left empty. If the task execution is abnormal, the barrier will be marked as broken. Count is used to record the number of remaining threads required to open the barrier, which will be reset using Parties after opening. CyclicBarrier is reusable and will be updated each time the barrier is opened. Generation represents the current generation.

// Synchronization depends on ReentrantLock
private final ReentrantLock lock = new ReentrantLock();
//
private final Condition trip = lock.newCondition();
// The number of threads needed to reach to open the barrier
private final int parties;
// The priority task to execute when the barrier is open
private final Runnable barrierCommand;

/** * The number of threads to wait for before the barrier is opened. * The barrier is turned on every time I await() --count, count==0 */
private int count;

CyclicBarrier is reusable for the current cycle. NextGeneration () will be called to reset the barrier
private Generation generation = new Generation();

CyclicBarrier is reusable and nextGeneration() is called to reset */
private static class Generation {
    /* Indicates whether the barrier is broken: 1. The wait times out; 2. The thread is interrupted; 3. The manual reset () * /
    boolean broken = false;
}
Copy the code

The constructor

CyclicBarrier provides two constructors that specify the number of threads required for the barrier to be opened and the task to execute first when the barrier is opened.

/* parties: number of threads required to enable the barrier */* /public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

/* Specifies only the number of threads */
public CyclicBarrier(int parties) {
    this(parties, null);
}
Copy the code

Core method

await

After the thread calls the await() method, the count decrement by 1, and if the count is still greater than 0, the thread is queued and blocked. CyclicBarrier provides two await() methods, both in response to interrupts, one that supports timeouts and one that does not:

  • await()

Wait without a timeout until the barrier is opened or the thread is interrupted.

  • await(long timeout, TimeUnit unit)

Timeout waits until the barrier opens, times out, or the thread is interrupted.

// Non-timeout wait
public int await(a) throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false.0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // This cannot happen because it is a non-timeout wait}}Copy the code

Both methods actually call dowait(), which supports timeouts.

  1. dowait()The first step is to lock and determine if the barrier is broken. If it is broken, the thread is not blocked, but an exception is thrown.
  2. The CyclicBarrier then determines whether the thread has been interrupted because the CyclicBarrier responds to an interrupt. If an interrupt occurs, it does not block, but throws an exception, marks the barrier as broken, and wakes up all waiting threads.
  3. If none of the above happens, thencountDecrement by 1 ifcountIf it is greater than 0, it determines whether it is a timeout wait, and if it is a timeout wait, it is calledtrip.awaitNanos()Suspends the thread for a specified time, otherwisetrip.await()The thread is suspended indefinitely.
  4. ifcountEqual to 0, it means that all threads have arrived, the barrier is open, callnextGeneration()Wake up all threads and update them.
/** wait for the barrier to open *@paramTimed Whether timed out *@paramNanos timeout */
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();// Lock to ensure synchronization
    try {
        final Generation g = generation;

        if (g.broken)
            // If the barrier is broken, throw an exception
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            // If the thread is interrupted, the barrier is broken and count is reset, waking up all threads
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // All threads arrive and the barrier opens
            boolean ranAction = false;// barrierCommand has no flags to execute normally
            try {
                final Runnable command = barrierCommand;
                if(command ! =null)
                    // If the constructor specifies barrierCommand, then execution takes precedence when the barrier is opened
                    command.run();
                ranAction = true;
                // Reset the state to wake up all threads and open the next round of barrier
                nextGeneration();
                return 0;
            } finally {
                if(! ranAction)// barrierCommand execution exception, barrier is brokenbreakBarrier(); }}/* The thread is blocked until: 1. The barrier is opened 2. The thread is interrupted 3. Wait timeout */
        for (;;) {
            try {
                if(! timed)// If timeout is not enabled, it blocks indefinitely
                    trip.await();
                else if (nanos > 0L)// If the timeout period is greater than 0, locksupport.parknanos ()
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.Thread.currentThread().interrupt(); }}if (g.broken)
                // The barrier is broken
                throw new BrokenBarrierException();

            if(g ! = generation)// Generation has been reset
                return index;

            if (timed && nanos <= 0L) {
                // The wait times out, the barrier breaks, and all threads are awakened
                breakBarrier();
                throw newTimeoutException(); }}}finally{ lock.unlock(); }}Copy the code

reset

To reset the barrier to its original state, it calls breakBarrier() to mark the barrier as broken and wakes up all waiting threads. Any thread coming in will throw a BrokenBarrierException. NextGeneration () is then called to update the barrier.

/** * Reset the barrier */
public void reset(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // The current barrier is broken, and all waiting threads are awakened
        breakBarrier();
        // Open a new round of barriers
        nextGeneration();
    } finally{ lock.unlock(); }}Copy the code

getNumberWaiting

Get the number of threads currently waiting for the barrier to open:

Return the number of waiting threads for the current barrier */
public int getNumberWaiting(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    	// Total - number of waits left
        return parties - count;
    } finally{ lock.unlock(); }}Copy the code

Other methods

CyclicBarrier has few core methods, mainly await(), and the rest of the code is quite simple:

GetParties () gets the number of threads needed to open the barrier:

/** * Number of threads required to return the barrier to open */
public int getParties(a) {
    return parties;
}
Copy the code

IsBroken () whether the current barrier isBroken:

/** Whether the current barrier is broken */
public boolean isBroken(a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally{ lock.unlock(); }}Copy the code

conclusion

CyclicBarrier code is easy to read and relies mainly on ReentrantLock, which in turn relies on AQS, so it is recommended to start from the bottom when reading the source code. Its main function is when a group of threads all arrive at a state, and then all at the same time, generally used to do multi-threaded concurrent testing more.


Articles you may be interested in:

  • AQS source guide
  • It’s a showdown. I’m gonna hand write an RPC
  • The Java lock bloat process and the effect of consistency hashing on lock bloat
  • ThreadLocal source parsing
  • CMS and three-color labeling algorithm
  • Accessibility analysis algorithm for vernacular understanding