preface
This article will analyze the source code for cyclicBarriers, and at the end of the analysis, it will use an example to show cyclicBarriers and compare cyclicBarriers with CountDownLatch.
1. Introduction to CyclicBarrier
A CyclicBarrier allows a group of threads to wait for each other until a certain condition is reached before continuing to execute before triggering a barrier; Cyclicbarriers can be reused after these threads are released, so they are also called cyclic barriers.
2. Analyze the source code
2.1. Construction method
// Define the number of threads that must be invoked before the barrier can be triggered
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
// Define the action to be performed when the last thread reaches the barrier
this.barrierCommand = barrierAction;
}
Copy the code
2.2, await() method
public int await(a) throws InterruptedException, BrokenBarrierException {
try {
// Call the dowait method without defining a timeout
return dowait(false.0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen}}private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// Get an exclusive lock
lock.lock();
try {
// Get "current generation"
final Generation g = generation;
// If current generation is damaged, an exception is thrown
if (g.broken)
throw new BrokenBarrierException();
// If the current thread is interrupted
if (Thread.interrupted()) {
// Terminates the CyclicBarrier and wakes up all waiting threads in the CyclicBarrier
breakBarrier();
throw new InterruptedException();
}
/ / count
int index = --count;
// The barrier is reached (the last thread is reached)
// The last thread to arrive does not execute the following for loop
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if(command ! =null)
// Initialize the incoming command
command.run();
ranAction = true;
// Call the next generation method
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// All non-last threads to arrive execute this statement, blocking on the trip.await() method
for (;;) {
try {
// If not "timeout waiting"
if(! timed)Call the await() method of condition
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// If the thread is interrupted while waiting, execute the following function
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.Thread.currentThread().interrupt(); }}if (g.broken)
throw new BrokenBarrierException();
// If "generation has replaced", return index.
if(g ! = generation)return index;
// Check timeout
if (timed && nanos <= 0L) {
// If time out, terminate the CyclicBarrier and wake up all waiting threads in the CyclicBarrier
breakBarrier();
throw newTimeoutException(); }}}finally {
// Release the "exclusive lock" and wake up the next waiting thread in AQSlock.unlock(); }}private void nextGeneration(a) {
// Call condition's signalAll() to transfer all waiters in its queue to AQS
trip.signalAll();
/ / reset the count
count = parties;
// Enter the next generation
generation = new Generation();
}
Copy the code
3. Differences between CyclicBarrier and CountDownLatch
- Both can block a set of threads and wait to wake up;
- The former is the last thread to arrive and wakes directly, the latter is a call to the countDown() method;
- The former is implemented by ReentrantLock’s “exclusive lock” and Conditon, while the latter is implemented by AQS’s “shared lock”.
- The former can be used repeatedly, while the latter can only be used once;
- The former can only achieve multiple threads to run together after arrival (multiple conditions can be run together);
- The latter not only allows one thread to wait for multiple threads (multiple conditions are true before they run together), but also allows multiple threads to wait for one thread (multiple conditions are true and wait for a special signal before they run together).
4, sample
public class CyclicBarrierTest {
public static final CyclicBarrier WORK_THREAD = new CyclicBarrier(3);
public static void main(String[] args) throws Exception{
// Main thread logic
Thread.sleep(2000);
for (int i=0; i<3; i++){
String condition_name = "Conditions"+i;
newThread(() -> { method(); },condition_name).start(); }}public static void method(a){
System.out.println("The conditions of waiting are:" + Thread.currentThread().getName());
try {
WORK_THREAD.await();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Upon completion of execution:"+ Thread.currentThread().getName()); }}Copy the code
Output result:
The conditions for waiting are: conditions0The conditions for waiting are: conditions1The conditions for waiting are: conditions2The conditions for completion are: conditions1The conditions for completion are: conditions2The conditions for completion are: conditions0
Copy the code
conclusion
This article introduces CyclicBarrier, analyzes the source code, uses a code to demonstrate how to use CyclicBarrier, and compares the difference between CyclicBarrier and CountDownLatch.
This concludes the JUC locking chapter, and the next chapter will start analyzing JUC collections.
If you found this post helpful, please give it a thumbs up and a follow.