CountDownLatch
CountDownLatch is suitable for multithreaded scenarios that require waiting for all child threads to complete.
For example, in the morning department meeting, someone is going to the bathroom, and the meeting can’t start until everyone comes back from the bathroom.
public class CountDownLatchTest {
private static int num = 3;
private static CountDownLatch countDownLatch = new CountDownLatch(num);
private static ExecutorService executorService = Executors.newFixedThreadPool(num);
public static void main(String[] args) throws Exception{
executorService.submit(() -> {
System.out.println("A is on the toilet.");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
System.out.println("A is done."); }}); executorService.submit(()->{ System.out.println("B is on the toilet.");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
System.out.println("B is done."); }}); executorService.submit(()->{ System.out.println("C is on the toilet.");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
System.out.println("C is done."); }}); System.out.println("Waiting for everyone to return from the toilet for the meeting...");
countDownLatch.await();
System.out.println("Everyone's ready, let's start the meeting..."); executorService.shutdown(); }}Copy the code
Code execution result:
A is on the toilet B is waiting for everyone to return from the toilet for A meeting... C is going to the bathroom, B is going to the bathroom, C is going to the bathroom, A is going to the bathroom.Copy the code
Initialize an instance of CountDownLatch and pass parameter 3, because we have three child threads that call countDown() to counter -1 every time the child thread finishes. The main thread will block after calling await() until the counter goes to 0 and await() returns. The difference between CountDownLatch and the join() method is that the join blocks the child thread until the end of execution, whereas CountDownLatch can return await() at any time, and the join cannot be used with ExecutorService, making CountDownLatch more flexible.
CountDownLatch is implemented based on AQS. The state of the volatile variable remains reciprocal and the shared variables of multiple threads are visible.
- CountDownLatch actually assigns a value to the state variable of AQS via the constructor initialization argument, maintaining the CountDownLatch countdown state
- When the main thread calls the await() method, the current thread blocks and waits in the AQS blocking queue if state is not 0.
- CountDown () is called by other threads and the state value decreases atomically, and when state is zero, all threads that block calling the await() method are woken up
CyclicBarrier
A CyclicBarrier is called a loop barrier, which allows a group of threads to execute at the same time after all the threads have reached a state.
public class CyclicBarrierTest {
private static int num = 3;
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
System.out.println("Everyone's ready, let's start the meeting...");
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
});
private static ExecutorService executorService = Executors.newFixedThreadPool(num);
public static void main(String[] args) throws Exception{
executorService.submit(() -> {
System.out.println("A is on the toilet.");
try {
Thread.sleep(4000);
System.out.println("A is done.");
cyclicBarrier.await();
System.out.println("Meeting over." "A is out.");
} catch (Exception e) {
e.printStackTrace();
}finally{}}); executorService.submit(()->{ System.out.println("B is on the toilet.");
try {
Thread.sleep(2000);
System.out.println("B is done.");
cyclicBarrier.await();
System.out.println("Meeting over." B is out.");
} catch (Exception e) {
e.printStackTrace();
}finally{}}); executorService.submit(()->{ System.out.println("C is on the toilet.");
try {
Thread.sleep(3000);
System.out.println("C is done.");
cyclicBarrier.await();
System.out.println("Meeting closed. C is out.");
} catch (Exception e) {
e.printStackTrace();
}finally{}}); executorService.shutdown(); }}Copy the code
The output is:
A on the potty B on the potty C on the potty B on the potty C on the potty A on the potty Everyone is ready, let's start the meeting... ------------------- The meeting ends, user A exits the meeting, user B exits the meeting, and user C exits the meetingCopy the code
The result is very similar to CountDownLatch, which initializes 3 threads and a task, blocks by calling await(), counters -1, and when the counter is 0, executes the task of the CyclicBarrier constructor, and wakes up all the blocked threads when the task is finished. This validates the CyclicBarrier effect of having a group of threads all reach a state and then all execute simultaneously.
Here’s another example to demonstrate the reusable effects of CyclicBarrier.
public class CyclicBarrierTest2 {
private static int num = 3;
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -");
});
private static ExecutorService executorService = Executors.newFixedThreadPool(num);
public static void main(String[] args) throws Exception {
executorService.submit(() -> {
System.out.println("A is on the toilet.");
try {
Thread.sleep(4000);
System.out.println("A is done.");
cyclicBarrier.await();
System.out.println("Meeting over," A "exits and starts coding.");
cyclicBarrier.await();
System.out.println("C work over, go home.");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
} finally{}}); executorService.submit(() -> { System.out.println("B is on the toilet.");
try {
Thread.sleep(2000);
System.out.println("B is done.");
cyclicBarrier.await();
System.out.println("Meeting over," "B" quits and starts fishing.");
cyclicBarrier.await();
System.out.println("B. Fishing is over, work is over.");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
} finally{}}); executorService.submit(() -> { System.out.println("C is on the toilet.");
try {
Thread.sleep(3000);
System.out.println("C is done.");
cyclicBarrier.await();
System.out.println("Meeting over, C quits and begins to fish.");
cyclicBarrier.await();
System.out.println("C. Fishing is over, work is over, go home.");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
} finally{}}); executorService.shutdown(); }}Copy the code
Output result:
A on the toilet B on the toilet C on the toilet B on the toilet C on the toilet A on the end of ------------------- meeting, A exit, began to masturbate code meeting end, B exit, began to touch the fish meeting end, C exit, Begin to touch the fish -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- C fishing in the end, C end of the work, I come home from work I come home from work B fishing in the end, I come home from work -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --Copy the code
As a result, each child thread calls the await() counter to zero before continuing to execute together, after the meeting ends, goes into the await state together, and leaves work together at the end of the last day, which is reusable.
CyclicBarrier is also implemented based on AQS, internally maintaining the parties to keep track of the total number of threads, count is used to count, initially count=parties, after calling await() count atom decrement, when count is 0, assign the parties to count again, That’s how reuse works.
- When the child thread calls the await() method, it acquires the exclusive lock, decrement count, enters the blocking queue, and releases the lock
- When the first thread blocks and releases the lock, the other child threads compete to acquire the lock, doing the same as 1
- Until the last count is 0, the CyclicBarrier constructor executes the task, after which the child thread continues down
Semaphore
Semaphore is called a Semaphore, and unlike the first two, its counter is incremented.
public class SemaphoreTest {
private static int num = 3;
private static int initNum = 0;
private static Semaphore semaphore = new Semaphore(initNum);
private static ExecutorService executorService = Executors.newFixedThreadPool(num);
public static void main(String[] args) throws Exception{
executorService.submit(() -> {
System.out.println("A is on the toilet.");
try {
Thread.sleep(4000);
semaphore.release();
System.out.println("A is done.");
} catch (Exception e) {
e.printStackTrace();
}finally{}}); executorService.submit(()->{ System.out.println("B is on the toilet.");
try {
Thread.sleep(2000);
semaphore.release();
System.out.println("B is done.");
} catch (Exception e) {
e.printStackTrace();
}finally{}}); executorService.submit(()->{ System.out.println("C is on the toilet.");
try {
Thread.sleep(3000);
semaphore.release();
System.out.println("C is done.");
} catch (Exception e) {
e.printStackTrace();
}finally{}}); System.out.println("Waiting for everyone to return from the toilet for the meeting...");
semaphore.acquire(num);
System.out.println("Everyone's ready, let's start the meeting..."); executorService.shutdown(); }}Copy the code
The output is:
A is on the toilet B is waiting for everyone to return from the toilet for A meeting... C is going to the bathroom, B is going to the bathroom, C is going to the bathroom, A is going to the bathroom.Copy the code
The constructor passes an initial value of 0, increments the counter when the child calls Release (), and the main acquire() argument passes 3, which means that the main thread blocks until the counter reaches 3.
Semaphore is still based on AQS, and there are two strategies for obtaining Semaphore simultaneously: fair and unfair
- When the main thread calls acquire(), it uses the current semaphore value – the value to be obtained. If it is less than 0, it enters the synchronous blocking queue; if it is greater than 0, it sets the current semaphore to the residual value through CAS and returns the residual value
- The child thread calls Release () to give the current semaphoto counter +1(the number of increments is determined by the pass), and keeps trying to enter the blocked thread by calling acquire()
conclusion
CountDownLatch provides a multithreaded control mode that is more flexible than Join through counters. CyclicBarrier can also achieve the effect of CountDownLatch and has the characteristics of reuse. Semaphore adopts the mode of increasing semaphores. The number of threads that need to be synchronized is not a concern at first, and fair and unfair strategies are provided for obtaining signals.