Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.
The last article introduced the principles and usage scenarios of the CountDownLatch utility class (source code analysis and usage scenarios for the concurrent utility class CountDownLatch). Today, WE will introduce CyclicBarrier, another popular concurrent utility class in the JUC package, which translates to a reusable barrier.
Introduction to the
- CyclicBarrier functions very similarly to CountDownLatch, as well
Controls the order in which threads are executed
CyclicBarrier, however, differs from CountDownLatch in that a CyclicBarrier blocks a group of threads at the same barrier (synchronization point) until the last thread reaches the barrier (that is, the barrier’s counter drops to zero), and the threads blocking at the barrier continue to execute. CountDownLatch allows one or more threads to wait for the other threads to complete before the current thread can continue. Another difference, implied by the class name, is that once a CyclicBarrier’s counter is reduced to zero, it can be reset so that it can be used againCyclic
(Loop) you can see that. When the CountDownLatch’s counter is reduced to 0, it is not reset and therefore cannot be reused.
The sample
- CyclicBarrier is also very simple to use. You just need to new a CyclicBarrier to create an instance object, which needs to be passed in its constructor
The int type
Is used to specifyBarrier size
(that is, when the number of threads reached the barrier, the barrier is opened), and then called in the threadawait()
Method to block the thread at the barrier. - In the following Demo example, 10 sprinters are simulated with 10 threads. In real life, in 100-meter race, athletes need to hear the starting gun to start, all athletes start at the same time, not false start. The starting gun is the CyclicBarrier of the program, and when everyone is ready and hears the starting gun sound (reaching the barrier), the race starts.
public class CyclicBarrierDemo {
public static void main(String[] args) {
Random random = new Random();
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
List<Thread> threads = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
threads.add(new Thread(()->{
int time = random.nextInt(5) + 1;
try {
// Use thread sleep to simulate each athlete's preparation time
Thread.sleep(time * 1000);
System.out.println(Thread.currentThread().getName() + "Ready to go");
// When the athlete is ready, he signals the starter that he is ready, calling the await() method
cyclicBarrier.await();
System.out.println("The starting gun went off,"+Thread.currentThread().getName() + "Start");
} catch (InterruptedException e) {
e.printStackTrace();
} catch(BrokenBarrierException e) { e.printStackTrace(); }},"Athlete"+(i+1)));
}
for(Thread thread : threads) { thread.start(); }}}Copy the code
Realize the principle of
CyclicBarrier
andCoutDownLatch
The underlying implementation of CountDownLatch is also a bit differentdirectly
This is done by combining a synchronous component that inherits AQS, CyclicBarrierNot directly
With the help of AQS synchronization components, but throughCombine the ReentrantLock lock
(The underlying implementation of ReentrantLock still uses AQS and, after all, the underlying implementation of CyclicBarrier).- Since CyclicBarrier is implemented using ReentrantLock, it has an attribute of
lock
. A counter is also maintained within CyclicBarrier:count
. Because CyclicBarrier can be reused, that is, when the counter is reset after it has been reduced to 0, there is another variable that holds the initial value of countparties
. CyclicBarrier has an attribute called Generation, which is a CyclicBarrier inner class generation and is used to implement CyclicBarrierawait(long timeout,TimeUnit unit)
The time-out wait functionality of the method (explained later in the source code analysis). When CyclicBarrier is reset, generation is also reset to assign. The CyclicBarrier properties and methods are shown in the table below.
Properties or methods | role |
---|---|
ReentrantLock lock | It is used to ensure thread safety and prevent thread insecurity when multiple threads modify count at the same time |
int count | Counter, which reduces count by one when the await() method is called |
int parties | Record the initial value of the counter |
Generation generation | This property is also reset when the counter is reset. Set the broken property of generation to true when timeout waits occur. |
Condition trip | Waiting queue |
Runnable barrierCommand | CyclicBarrier allows a Runnable task to be executed when the counter is reduced to zero, and then the thread blocking at the barrier is executed |
await() | Let the thread wait while blocking at the barrier and decrease the counter by 1. Timeout wait is not supported |
await(long timeout, TimeUnit unit) | Let the thread wait while blocking at the barrier, wait for the maximum timeout unit and decrease the counter by 1 |
reset() | Reset the barrier |
- CyclicBarrier has two parameter constructors, as follows.
// parties specifies the size of the counter
// barrierAction is a Runnable, and when the counter is reduced to 0, the barrierAction is executed before the barrier is opened
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
// parties specifies the size of the counter
public CyclicBarrier(int parties) {
// Call the parameterized constructor with two arguments
this(parties, null);
}
Copy the code
- When you perform
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
This line of code initializes the value of the counter count and parties. The passed argument, 10, indicates that the barrier will not be opened until 10 threads have reached it. - CyclicBarrier. Await () is called directly in the await() method when it is called
dowait()
Methods. The source code for the dowait() method is shown below.
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// If a thread calls await(long timeout,TimeUnit unit) and waits out, then g.broken is true and therefore raises an exception
if (g.broken)
throw new BrokenBarrierException();
// If the thread is interrupted, interrupt the barrier directly (wake up all waiting threads)
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// The counter decrement
int index = --count;
// If the decrement result is 0, all threads have reached the barrier
if (index == 0) { // tripped
boolean ranAction = false;
try {
// Determine if there are any tasks that need to be prioritized and execute them
final Runnable command = barrierCommand;
if(command ! =null)
command.run();
ranAction = true;
NextGeneration () wakes up all threads in the wait queue while the count value of the counter is reset
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// If the counter does not drop to 0, the current thread is queued to wait
for (;;) {
try {
// Timed indicates whether a timed wait is occurred
if(! timed)Call the await() method of condition to enter the wait queue
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.Thread.currentThread().interrupt(); }}if (g.broken)
throw new BrokenBarrierException();
if(g ! = generation)return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw newTimeoutException(); }}}finally{ lock.unlock(); }}Copy the code
- In dowait(), generation.broken is checked to see if it is true. It must be false when first entered (the default is false). This field is only valid if the thread calls await(long timeout,TimeUint unit). True only when a timeout occurs.
- If count is 0, all threads have reached the barrier, and the barrier can be opened to allow the blocking thread to continue executing. However, before opening the barrier, it determines whether barrierCommand is empty, and if not, barrierCommand is executed first. The nextGeneration() method is then called. The main role of nextGeneration() is
Wake up all threads in the wait queue and reset the counter
. The source code is as follows.
private void nextGeneration(a) {
// signal completion of last generation
// Wake up all waiting threads in the wait queue
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
Copy the code
- If count minus one is not zero, there are threads
We can't open the barrier until we reach it
Therefore, it is necessary to make the current thread join the wait queue, that is, calltrip.await()
, let the thread wait. - If a timeout occurs, it is executed in the for loop
The catch block
In the catch block, the breakBarrier() method is calledSet generation's broken property to true
. So when you get toif(g.broken)
It will decide that it is true and throw an exception, thus implementing the timeout wait function.
private void breakBarrier(a) {
generation.broken = true;
count = parties;
trip.signalAll();
}
Copy the code
- If no timeout wait occurs, when the counter drops to zero, all threads in the TRIP wait queue are woken up and put into the Lock synchronization queue, then node by node threads are woken up in the Lock synchronization queue, and then when threads from the
trip.await()
Method to wake up and continue with the following logic. A detailed analysis of ReentrantLock can be found in these two articles:ReentrantLock source code analysisand
Fair locks versus unfair locks - The implementation of the await(long timeout,TimeUnit Unit) method of Cyclicbarrier also ends up calling the dowait() method, so I won’t go into details here. Overall, CyclicBarrier’s source code implementation is relatively simple.
The difference with CountDownLatch
- CyclicBarrier differs from CountDownLacth in several ways. CyclicBarrier allows all threads to reach the barrier before executing the logic together, while CountDownLatch allows a thread or group of threads to wait for the other threads to finish executing before executing again. Second, the CyclicBarrier’s counter can be reset and therefore reused, whereas the CountDownLatch’s counter cannot be reset and reused. Third, a CyclicBarrier can perform a Runnable task before opening a barrier after all threads have reached it, which is useful in special scenarios. Fourth, although both underlying implementations are ultimately based on AQS, CyclicBarrier is implemented indirectly using AQS through ReentrantLock and CountDownLatch is implemented directly using AQS shared locks.
- The CyclicBarrier constructor supports passing in a parameter of type Runnable, which is shown in the Demo at the beginning of this article. In the Demo above, the runners are ready to stand at the starting line, and the starter fires the gun first, and then the runners start to run, that is, the thread starts to execute, so the firing action is before the barrier is opened, so we use Runnable to achieve this. The sample code is as follows.
public class CyclicBarrierDemo {
public static void main(String[] args) {
Random random = new Random();
// In the CyclicBarrier constructor, the second argument is passed a Runnable.
CyclicBarrier cyclicBarrier = new CyclicBarrier(10.new Runnable() {
@Override
public void run(a) {
System.out.println("============== in position!! Get ready!! Bang! = = = = = = = = = = = ="); }}); List<Thread> threads =new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
threads.add(new Thread(()->{
int time = random.nextInt(5) + 1;
try {
Thread.sleep(time * 1000);
System.out.println(Thread.currentThread().getName() + "Ready to go");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "Start");
} catch (InterruptedException e) {
e.printStackTrace();
} catch(BrokenBarrierException e) { e.printStackTrace(); }},"Athlete"+(i+1)));
}
for(Thread thread : threads) { thread.start(); }}}Copy the code
- As can be seen from the print result, when all the athletes are ready, it will be printed first
============== in position!! Get ready!! Bang! = = = = = = = = = = = =
After this line, other threads are allowed to continue.
conclusion
- This paper introduces in detail the functions of CyclicBarrier and how to use it, then analyzes the implementation principle of CyclicBarrier with the combination of source code, and compares the differences between CyclicBarrier and CountDownLatch in terms of function and underlying implementation principle. Finally, it summarizes. In most scenarios, everything CountDownLatch can do can be done using CyclicBarrier.
recommended
- The implementation principle of ReadWriteLock
- Semaphore source code analysis and use scenarios
- Concurrency tool class CountDownLatch source analysis and usage scenarios
- Design principle of queue synchronizer (AQS)
- Queue synchronizer (AQS) source code analysis