The problem
(1) What is CyclicBarrier?
(2) What are the properties of CyclicBarrier?
(3) The comparison between CyclicBarrier and CountDownLatch?
Introduction to the
CyclicBarrier, which blocks a group of threads until they both reach a condition. It is similar to, but different from, CountDownLatch in that CountDownLatch requires a call to countDown() to trigger an event, while CyclicBarrier does not. It acts like a fence, and only moves down when a group of threads have reached it.
Method of use
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
new Thread(()->{
System.out.println("before");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("after"); }).start(); }}}Copy the code
This method is simple: use a CyclicBarrier to keep three threads in sync when all three threads reach CyclicBarrier. Await (); Everybody runs down again.
Source code analysis
Main inner class
private static class Generation {
boolean broken = false;
}
Copy the code
Generation controls the use of cyclicbarriers.
For example, if the three threads in the above example complete and go to the next generation, waiting for the three threads to reach the fence before executing together, CountDownLatch cannot do this. CountDownLatch is one-time and cannot be reset.
The main properties
/ / reentrant lock
private final ReentrantLock lock = new ReentrantLock();
// A conditional lock is called trip, which means a thread will trip when it arrives and wake up when it reaches a certain number
private final Condition trip = lock.newCondition();
// The number of threads to wait
private final int parties;
// The command to execute when awakened
private final Runnable barrierCommand;
/ / generation
private Generation generation = new Generation();
// The number of threads that need to wait in the current generation
private int count;
Copy the code
As you can see from the properties, cyclicBarriers are internally conditioned by reentrant locks, so can you imagine this scenario?
Tong Brother imagine this: If the initial count = parties = 3, when the first thread reaches the barrier, count is reduced by 1 and the Condition is added to the queue, so is the second thread that reaches the barrier, and the third thread that reaches the barrier, count is reduced to 0, Call Condition’s signalAll() to notify the other two threads, then queue them to AQS, wait for the current thread to finish, and call Lock.unlock () to wake up one thread from the AQS queue to continue running. This means that the three lines actually arrive at the fence one by one, and then go down.
The above is purely tong Elder brother’s imagination. Is it true? Let’s look ahead.
A constructor
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// Initialize the parties
this.parties = parties;
// initialize count to equal parties
this.count = parties;
// Initialize all the commands executed at the fence
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
Copy the code
The constructor needs to pass in a parties variable, which is the number of threads to wait.
Await () method
Each thread that needs to wait at the fence needs to explicitly call await() to wait for another thread to arrive.
public int await(a) throws InterruptedException, BrokenBarrierException {
try {
// Call the dowait method without timeout
return dowait(false.0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen}}private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
/ / lock
lock.lock();
try {
/ / the current generation
final Generation g = generation;
/ / check
if (g.broken)
throw new BrokenBarrierException();
// Interrupt the check
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// Count is reduced by 1
int index = --count;
// If the number is reduced to 0, go to this logic (the last thread goes here)
if (index == 0) { // tripped
boolean ranAction = false;
try {
// If a command is passed during initialization, execute it here
final Runnable command = barrierCommand;
if(command ! =null)
command.run();
ranAction = true;
// Call the next generation method
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// This loop can only be reached by the non-last thread
for (;;) {
try {
if(! timed)Call the await() method of condition
trip.await();
else if (nanos > 0L)
// Timeout wait method
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.Thread.currentThread().interrupt(); }}/ / check
if (g.broken)
throw new BrokenBarrierException();
// It is not the same here
// The generation reference has changed since nextGeneration() was called when the fence was broken
if(g ! = generation)return index;
// Timeout check
if (timed && nanos <= 0L) {
breakBarrier();
throw newTimeoutException(); }}}finally{ lock.unlock(); }}private void nextGeneration(a) {
// Call condition's signalAll() to transfer all waiters in its queue to AQS
trip.signalAll();
/ / reset the count
count = parties;
// Enter the next generation
generation = new Generation();
}
Copy the code
The entire logic in the dowait() method is divided into two parts:
(1) the last thread steps up to the above logic, when count is reduced to 0, breaks the barrier, it calls nextGeneration() method to notify the waiting thread in the conditional queue to move to AQS queue to be woken up and enter the nextGeneration.
(2) The non-last thread goes through the following for loop logic. These threads will block at the await() method of the condition, they will join the condition queue, wait to be notified, and return when they wake up having updated the “generation”.
The illustration
After studying the previous chapters, this diagram is very easy to read. If you don’t understand it, you should also take a good look at the recommended content
conclusion
(1) CyclicBarrier causes a group of threads to block at await() and wake up (only to move from the conditional queue to the AQS queue) the threads in front of it when the last thread arrives and continue to go down;
(2) CyclicBarrier is not a synchronizer implemented directly using AQS;
(3) CyclicBarrier realizes the synchronization logic based on ReentrantLock and its Condition;
eggs
CyclicBarrier and CountDownLatch?
(1) Both can block a set of threads waiting to be woken up;
(2) The former is automatically woken up when the last thread arrives;
(3) The latter is done by explicitly calling countDown();
(4) The former is realized through reentrant lock and its conditional lock, while the latter is directly realized based on AQS;
(5) The former has the concept of “generation” and can be used repeatedly, while the latter can only be used once;
(6) The former can only realize multiple threads to run together at the fence;
(7) The latter can not only implement multiple threads waiting for one thread condition to be established, but also implement one thread waiting for multiple thread condition to be established (see CountDownLatch chapter use case);
Recommended reading
The beginning of the Java Synchronization series
2, Unbroadening Java magic class parsing
JMM (Java Memory Model)
Volatile parsing of the Java Synchronization series
Synchronized parsing of Java series
6, Deadknock Java synchronization series write a Lock Lock yourself
7. AQS of The Java Synchronization series
ReentrantLock (a) — fair lock, unfair lock
ReentrantLock – Conditional lock
ReentrantLock VS Synchronized Java series
ReentrantReadWriteLock source code parsing
Semaphore source code analysis of Semaphore Java synchronization series
CountDownLatch source code parsing
The AQS finale of the Java Sync series
Java synchronization series StampedLock source code parsing
Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.