Why do YOU need concurrency control
When multiple threads execute concurrently, by default the CPU switches threads randomly, out of our programmer’s control. There are times when we want the CPU to execute threads as we want, and that’s when coordinated control between threads is needed.
How to implement concurrency control
- CountDownLatch
- CyclicBarrier
- Condition
- Semaphore
- Pharse
CountDownLatch reverses the counter
CountDownLatch is a very useful multithreaded control tool class, usually used to control thread waits so that a thread waits for the count to end before starting execution.
The main method
- The constructor CountDownLatch(int count) constructs a CountDownLatch initialized with the given count
- The thread calling the method await() will be suspended until count drops to 0, unless the thread is interrupted.
- Await (long timeout, TimeUnit unit) causes the current thread to wait until the latch counts back to zero, unless the thread is interrupted or the specified wait time is exceeded.
- CountDown () : Every time countDown() is called, the count value is reduced by one until it reaches zero, and the thread calling the await is woken up
Usage scenarios
- Usage one: a thread waits for multiple threads to complete before continuing to execute.
For example, we use it to simulate the process of making orders. If 5 users are required to purchase an item in order to generate an order, CountDownLatch is used to achieve this.
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++){
final int userId = i+1;
service.submit(() -> {
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println(String.format("[user %s] joined a single order...", userId));
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ countDownLatch.countDown(); }}); } System.out.println("Waiting for bill.....");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Complete the spell....."); }}Copy the code
Waiting for bill..... [user 3] joined the spell order... [User 2] joined the spell order... [User 1] joined the spell order... [user 4] joined the spell order... [user 5] joined the spell order... Complete the order.....Copy the code
- Multiple threads wait for a signal from a thread to start execution at the same time
public class CountDownLatchDemo2 {
public static void main(String[] args) {
CountDownLatch begin = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++){
final int userId = i+1;
service.submit(() -> {
try {
System.out.println(String.format("[user %s] joined the seckill...", userId));
begin.await();
System.out.println(String.format("[user %s] start SEC kill...", userId));
} catch(InterruptedException e) { e.printStackTrace(); }}); } service.shutdown();try {
TimeUnit.SECONDS.sleep(5);
System.out.println("SEC kill!!");
begin.countDown();
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
[User 5] joined the seckill... [user 4] joined the seckill... [user 3] joined the seckill... [User 1] joined the seckill... [User 2] joined the seckill... SEC kill!! [user 3] Start SEC kill... [user 5] Start SEC kill... [user 4] Start SEC kill... [user 1] Start SEC kill... [user 2] Start SEC kill...Copy the code
This kind of wait one scenario can be used for stress testing.
Summary of use: When creating CountDownLatch, count counts are passed in. If the thread is waiting, the await() method calling CountDownLatch is suspended. If the thread counts down, count counts down CountDownLatch’s countDown() is called. When all three methods are combined, when countDown reaches 0, the thread that called await will be triggered to execute. To control concurrency.
- CountDownLatch cannot be rolled back to reset
CyclicBarrier
A CyclicBarrier, which translates as a circular barrier, allows a group of threads to wait for each other until a common barrier point is reached. CyclicBarrier and CountDownLatch are similar in that they block a set of threads.
The main method
-
CyclicBarrier(int parties, Runnable barrierAction) creates a new CyclicBarrier that starts when a given number of participants (threads) are in a waiting state, And performs the given barrier action when the barrier is started, which is performed by the last thread to enter the barrier.
-
Await () will wait until all participants have called the await method on the barrier.
Code sample
CyclicBarrier was used to simulate the collection of five blessings
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, ()->{
System.out.println(Thread.currentThread().getName());
System.out.println("Gather the five blessings and start synthesizing!");
});
String[] blessings = {Patriotic bliss."Friendly Blessing"."Professional Happiness".Harmony and Happiness."Prosperity and prosperity"};
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(()->{
System.out.println(
String.format(Send you "% s % s",
Thread.currentThread().getName(),
blessings[index]));
try {
Thread.sleep(index * 100);
// Each thread arrives and waits for other threads to arrive. Once all five have arrived, the thread in the constructor is executed by the last thread to arrive and then continues
cyclicBarrier.await();
System.out.println("Five blessings! Compositing!!"+blessings[index]+"Disappear");
// Verify that counters are resettable
cyclicBarrier.await();
} catch(Exception e) { e.printStackTrace(); } }).start(); }}}Copy the code
== Notice the console output, where five threads arrive at the cyclicBarrier. Await () barrier point, waiting for each other… Five blessings are collected, i.e., await() is called five times, and then the barrier operation specified in the constructor is performed by the last Thread, Thread-4, not the main Thread. Then continue to perform the respective operations of the thread. = =
== and the second time I called the await method. We found that our cyclicBarrier(5, runnable) still works. = =
Thread-1 send you friendly thread-0 send you patriotic thread-3 send you harmony THread-4 five blessings set together, begin to synthesis! Wufu to! Compositing!! Prosperity and prosperity disappear to five blessings! Compositing!! Professional blessing disappear to five blessing! Compositing!! Patriotic blessing disappear to five blessing! Compositing!! The blessing of friendship is gone! Compositing!! Harmony blessedness is gone Thread-3 Five blessedness is collected and the synthesis begins!Copy the code
CountDownLatch is different from CountDownLatch. CountDownLatch is a subtraction that wakes the await thread until it counts down to 0 and cannot be used repeatedly. Cyclicbarriers are additive and can be recycled.
Semaphore Semaphore
Semaphore, translated as Semaphore, is used to control the number of simultaneous operations that access a particular resource, or the number of simultaneous operations that are performed.
The main method
- Initialize Semaphore and specify the number of licenses
- Get a permit. Call acquire()/acquireUninterruptibly() before a method that requires restricted access
- License release()
- TryAcquire (), if any are available. If not, it won’t block, so you can do something else, and we’ll see if there’s anything available later. RtyAcquire (timeout) is the same as tryAcquire(), but with an extra timeout.
Simple to use
public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { service.submit(() ->{ try { semaphore.acquire(); System.out.println(thread.currentThread ().getName())+ ); // Suppose this is the way we need to limit the execution of thread.sleep (1000); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); System.out.println(thread.currentThread ().getName()+ ); }}); } service.shutdown(); }}Copy the code
Pay attention to the point
- Permits Semaphore acquire(int permits) release(int permits) Semaphore acquire(int permits) release(int permits) Semaphore acquire(int permits) release(int permits)
Condition interfaces
Condition is an interface that replaces the traditional synchronized methods of calling Object’s wait() and notify() to implement thread collaboration. The Condition instance essentially needs to be bound to a Lock, and we need to call lock.newcondition () to get it. The Condition signal() and signalAll() must be used between lock.lock() and lock.unlock.
The main method
- Await () calls to the condition.await() method cause the thread currently obtaining the lock to enter the wait queue. It moves from the wait queue to the synchronization queue until it is woken up by signal/signalAll.
- Signal () because the wait queue is FIFO, only the thread that has waited the longest is invoked.
- SignalAll () invokes all waiting threads.
Condition is used to implement the producer-consumer pattern
public class ConditionDemo {
private static int QUEUE_MAX_SIZE = 10;
private Queue<Integer> queue = new ArrayDeque<Integer>(QUEUE_MAX_SIZE);
private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo demo = new ConditionDemo();
new Thread(demo.new Consumer(a)).start(a);
new Thread(demo.new Producer(a)).start(a);
}
class Consumer implements Runnable {
@Override
public void run(a) {
while (true) {
try {
lock.lock();
while(queue.size()==0){
System.out.println(Queue empty, waiting for producer production data);
empty.await();
}
queue.poll();
full.signal();
System.out.println(String.format("The consumer consumes one element, and there are %s elements left.",
queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ lock.unlock(); }}}}private class Producer implements Runnable {
@Override
public void run(a) {
while (true) {
try{
lock.lock();
while(queue.size() == QUEUE_MAX_SIZE){
System.out.println("The queue is full, waiting for consumers to spend.");
full.await();
}
queue.offer(1);
empty.signal();
System.out.println(String.format("The producer added one element, remaining %s elements",
queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
}
}
Copy the code
Phaser Phaser
Phaser (rarely used in my work), a new addition to JDK1.7, is similar to CountDownLatch and CyclicBarrier, but is more flexible and powerful to use. It can realize the situation problem of ==== controlling multithreading to complete tasks together by stages ====.
The main method
- Build a Phaser/create a Phaser with a specified number of barriers, the same as CountDownLatch, passing in the number of synchronized threads.
- Register()/bulkRegister(int parties) Register a new party or register multiple parties in batches. This is more powerful than CountDownLatch for dynamic registration.
- Arrive () reaches the barrier point of this phaser, completing the phase without waiting for another thread.
- ArriveAndAwaitAdvance () reaches the barrier point of this phaser, completes the phase, and blocks until other threads reach the barrier point.
- ArriveAndDeregister () reaches phaser’s barrier point, incrementing the number of phaser threads arriving by one, unregistering a barrier point without blocking waiting for other threads.
- Refer to the JDK documentation for other methods
Take a chestnut
In the scenario of shopping spree, we assume that the process of shopping spree is a thread. This thread is divided into two stages, step1: join the order, step2: pay. Assume that five users participate in a shopping spree, that is, five threads wait for each other to complete step1 before they can complete step2.
public class PhaserDemo {
public static void main(String[] args) {
PhaserDemo demo = new PhaserDemo();
Phaser phaser = new Phaser();
phaser.bulkRegister(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int userId = i + 1;
service.submit(demo.new BuyTogeter(userId, phaser));
}
service.shutdown();
}
class BuyTogeter implements Runnable {
private Phaser phaser;
private int userId;
public BuyTogeter(int userId, Phaser phaser) {
this.userId = userId;
this.phaser = phaser;
}
@Override
public void run(a) {
try {
// Stage 0 - barrier - Wait for 5 users to add the order, only 5 users to complete the order, can pay
Thread.sleep((long) (Math.random() * 1000));
System.out.println(String.format("Phase %s: [user %s] joins a spell...",phaser.getPhase(), userId));
phaser.arriveAndAwaitAdvance();
// Stage 1 - Wait for 5 users to pay
Thread.sleep((long) (Math.random() * 1000));
System.out.println(String.format("Phase %s: [user %s] completes payment...",phaser.getPhase(), userId));
phaser.arriveAndDeregister();
} catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code
We see the console output
Phase 0: [user 2] joins the bill... Phase 0: [user 1] joins the bill... Phase 0: [user 3] joins the bill... Phase 0: [User 5] joins the bill... Phase 0: [user 4] joins the bill... Stage 1: [User 2] completes payment... Stage 1: [User 5] completes payment... Stage 1: [User 3] completes payment... Stage 1: [User 1] completes payment... Stage 1: [User 4] completes payment...Copy the code
conclusion
CountDownLatch/CyclicBarrier/Semaphore/Phaser their usage the same, we pay attention to their difference in use, when we need to control the thread to wait, wait for each other, current limiting, awaiting execution in stages, flexible use of these tools.
The above thread concurrency control tool class to do some simple introduction, and did not analyze their implementation principle from the source layer, will go to do source code analysis, welcome to pay attention to.
Please scan the QR code to follow the official accountCopy the code