1. CyclicBarrier
- Similar to CountDownLatch
- The thread waits until enough threads have reached the specified number. Once the condition is triggered, you can proceed to the next step
- This applies to scenarios where threads are waiting for each other to finish processing
- A CyclicBarrier can construct a rally point, and when a thread has finished executing, it will wait there until all threads have reached the rally point, then the barrier will be removed, and all threads will start to perform the rest of the task
Usage a: wait for all people to arrive at the designated place, then unified start
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {@override public void run() {system.out.println (" everyone is here, everyone is here "); }}); for (int i = 0; i < 5; i++) { new Thread(new Task(cyclicBarrier)).start(); } } static class Task implements Runnable { private final CyclicBarrier cyclicBarrier; public Task(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @override public void run() {try {system.out.println (thread.currentThread ().getName()); Thread.sleep((long) (Math.random() * 10000)); System.out.println(thread.currentThread ().getName() + "already in place, waiting for others to arrive "); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); }}}}Copy the code
Usage two: 👉CyclicBarrier simple use example
2. CountDownLatch
- Like a CyclicBarrier, an action is triggered when the number drops to zero
- But not reusable
Main methods:
- CountDownLatch(int count), has only one constructor, parametercountIs the value to be reciprocal
- await(), the callawait()The method’s scene is suspended, and it waits untilcountThe execution continues only when the value is 0
- countDown(),countThe value is reduced by 1 until it is 0, and the waiting scene is evoked
A thread waits for multiple threads to complete before continuing its work.
import java.util.concurrent.*; public class Test { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("Num:" + no); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); }}}; service.submit(runnable); } system.out. println(" wait for 5 threads to complete....") ); latch.await(); System.out.println(" All threads have finished executing, ready to proceed to the next link." ); }}Copy the code
Multiple threads wait for a signal from one thread and then start executing at the same time.
import java.util.concurrent.*; /** * simulation running race, 5 runners wait for a judge to give the order, Public class Test {public static void main(String[] args) throws InterruptedException {// 1 judge CountDownLatch latch = new CountDownLatch(1); / / 5 contestants ExecutorService service = Executors. NewFixedThreadPool (5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable Runnable = new Runnable() {@override public void run() {try {system.out.println ("NO." +" Wait for the start "); latch.await(); System.out.println("NO." + NO + "start running "); } catch (InterruptedException e) { e.printStackTrace(); }}}; service.submit(runnable); } system.out. println(" Referee checks starting gun...." ); Thread.sleep(2000); System.out.println(" Referee check finished, the game begins...." ); latch.countDown(); }}Copy the code
CyclicBarrier and CountDownLatch
- The CyclicBarrier waits for a fixed number of threads to reach the fence before it can continue, whereas CountDownLatch waits only for the number to reach zero. That is, CountDownLatch is used for events, but CyclicBarrier is used for threads
- The reusability is different: After CountDownLatch counts down to zero and triggers the latch to open, it cannot be used again unless a new instance is created; Cyclicbarriers can be reused
3. Semaphore
- Semaphores can be controlled by controlling the number of “licenses” to ensure coordination between threads
- Threads can only continue to run if they have a “license,” which is more flexible than other synchronizers
Main methods:
- New Semaphore(int permits, Boolean fair) allows permits to be set, if true, Semaphore will queue up threads waiting in the FIFO for a new permit. Can be distributed to the thread that has waited the longest;
- Acquire (), without which it will be blocked;
- AcquireUninterruptibly (), same as above, but can respond to interrupts;
- TryAcquire (), see if there are any free licenses, and if there are, get them, if not, do something else instead of blocking, and come back later to see if there are any free licenses
- TryAcquire (Long Timeout, TimeUnit Unit), andtryAcquire()Same, but with a timeout, like “If you don’t get a license in 3 seconds, do something else.”
- Release (), and when it’s done, return the license
- You can specify the number of licenses to obtain and release, and the number must be the same
Usage:
import java.util.concurrent.*; public class Test { static Semaphore semaphore = new Semaphore(3, true); public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(50); for (int i = 0; i < 100; i++) { service.submit(new Task()); } service.shutdown(); } static class Task implements Runnable { @Override public void run() { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } system.out.println (thread.currentThread ().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } system.out.println (thread.currentThread ().getName() + "release license "); semaphore.release(); }}}Copy the code
4. Condition
- Controls the “wait” and “wake up” of threads, an updated version of Object.wait()
Realization of producer consumer
import java.util.PriorityQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionDemo { private final int queueSize = 10; private final PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize); private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args) { ConditionDemo demo = new ConditionDemo(); Produce produce = demo.new Produce(); Consume consume = demo.new Consume(); produce.start(); consume.start(); } /** * Consume extends Thread {@override public void run() {try {Consume (); } catch (InterruptedException e) { e.printStackTrace(); } } private void consume() throws InterruptedException { while (true) { lock.lock(); Try {while (queue.size() == 0) {system.out.println (" queue empty, waiting for data "); notEmpty.await(); } // fetch data from the queue. Poll (); // And wake up the producer notfull.signal (); Println (" queue.size() + queue.size()); } finally { lock.unlock(); }}}} /** ** class extends Thread {@override public void run() {try {Produce (); } catch (InterruptedException e) { e.printStackTrace(); } } private void produce() throws InterruptedException { while (true) { lock.lock(); Try {while (queue.size() == queueSize) {system.out.println (" queue full, waiting for consumers to consume "); notFull.await(); } // Queue. Offer (1); // And wake up the consumer notempty.signal (); System.out.println(" added a data to queue, queue remaining: "+ queue.size()); } finally { lock.unlock(); } } } } }Copy the code
5. Phaser
- Like CyclicBarrier
Example: Perform a synchronization operation after four threads are completed
import java.nio.file.Path; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; public class PhaserTest { Phaser phaser = new Phaser(); ExecutorService executorService = Executors.newCachedThreadPool(); class Worker implements Runnable { @Override public void run() { phaser.register(); while (true) { try { Thread.sleep(500); System.out.println("working:" + phaser.getPhase()); phaser.arriveAndAwaitAdvance(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public void run() throws InterruptedException { phaser.register(); executorService.execute(new Worker()); executorService.execute(new Worker()); executorService.execute(new Worker()); executorService.execute(new Worker()); while (true) { phaser.arriveAndAwaitAdvance(); System.out.println("Sync..." + phaser.getPhase()); } } public static void main(String[] args) throws InterruptedException { var test = new PhaserTest(); test.run(); }}Copy the code
6. Exchanger
- Let the two threads exchange data when appropriate
- Application scenario: Used to exchange data when two threads are working on different instances of the same class
- What problem is solved: efficient exchange of data between threads