Part of this article is excerpted from The Art of Concurrent Programming in Java
CountDownLatch
CountDownLatch allows one or more threads to wait for other threads to complete an operation. Suppose there is a requirement: we need to parse the data of multiple sheets in an Excel. At this time, we can consider using multi-threading, and each thread parses the data of one sheet. After all the sheets are parsed, the program needs to prompt the completion of parsing. In this requirement, the simplest way to implement the main thread waiting for all threads to finish parsing the sheet is to use the join() method
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run(a) {
System.out.println("parser2 finish"); }}); Thread parser2 =new Thread(new Runnable() {
@Override
public void run(a) {
System.out.println("parser2 finish"); }}); parser1.start(); parser2.start(); parser1.join(); parser2.join(); System.out.println("all parser finish"); }}Copy the code
CountDownLatch, which is available in the JDK5 concurrency package, also provides join functionality, and more
public class CountDownLatchTest {
// CountDown's constructor takes an argument of type int as a counter
// wait for N points to complete, then pass N
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run(a) {
System.out.println(1);
// Every time the countDown method is called, N is reduced by one
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
// await blocks the current thread until N becomes zero
c.await();
System.out.println(3); }}Copy the code
CyclicBarrier
CyclicBarrier allows a group of threads to block at a barrier (synchronization point) until the last thread reaches the barrier and the barrier is released and all threads blocked by the barrier continue to run
public class CyclicBarrierTest {
// Pass in the argument 2
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run(a) {
try {
// Call await, count minus one, and block until count is zero
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(2); }}Copy the code
If you change new CyclicBarrier(2) to new CyclicBarrier(3), the main thread and child threads will wait forever because there is no third thread executing the await method
CyclicBarrier also provides a more advanced constructor CyclicBarrier(int parties, Runnable barrierAction), which is used to preferentially execute the barrierAction method when a thread reaches the barrier
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2.new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run(a) {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run(a) {
System.out.println(3); }}}Copy the code
The final output must start with a 3
Semaphore
Semaphore is used to control the number of threads accessing a particular resource at the same time. It coordinates threads to ensure proper use of common resources
1. Application scenarios
Semaphore can be used for flow control, especially when common resources are limited, such as database connections
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run(a) {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch(InterruptedException e) { e.printStackTrace(); }}}); } threadPool.shutdown(); }}Copy the code
In the code, although there are 30 threads executing, only 10 threads are allowed to execute concurrently. Semaphore(int permits) accepts an integer indicating the number of permits available. Semaphore is also very simple to use. First a thread uses Semaphore’s acquire() method to acquire a license, then calls Release () to return it, or tries to acquire a license using tryAcquire()
Exchanger
Non-recovery is a tool class for collaboration between threads, which is used for data exchange between threads. It provides a synchronization point at which two threads can exchange data with each other. The two threads exchange data using the Exchange method. If the first thread executes the Exchange method first, it will wait for the second thread to execute the Exchange method. When both threads reach the synchronization point, the two threads can exchange data
Suppose there is a demand: we need to manually input the paper bank statement into the electronic bank statement. In order to avoid mistakes, two people from AB are employed to input. After the input is completed, the system needs to load the data entered by the two people for comparison to see whether the input is consistent
public class ExchangerTest {
private static final Exchanger<String> exchanger = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run(a) {
try {
// A Input bank statement data
String A = "Bank statement A";
exchanger.exchange(A);
} catch(InterruptedException e) { e.printStackTrace(); }}}); threadPool.execute(new Runnable() {
@Override
public void run(a) {
try {
// B input bank statement data
String B = "Bank statement B";
String A = exchanger.exchange(B);
System.out.println("Are data A and B consistent?" + A.equals(B) + ", A typed:" + A
+ ", B:" + B);
} catch(InterruptedException e) { e.printStackTrace(); }}}); threadPool.shutdown(); }}Copy the code