preface

The utility classes that control concurrent flow are designed to help us programmers make it easier for threads to collaborate, and threads to collaborate with each other to satisfy business logic. For example, let thread A wait for thread B to finish executing.

The main tool classes for controlling concurrent processes are:

Introduction to the

background

CountDownLatch was introduced in Java1.5, along with CyclicBarrier, Semaphore, ConcurrenthashMap, and BlockingQueue.

Under the java.util.cucurrent package.

concept

CountDownLatch is a synchronization counter that allows one or more threads to wait until another set of threads completes execution, based on the AQS shared mode.

This is done with a counter whose initial value is the number of threads. Each time a thread completes, the counter has a value of -1. When the counter has a value of 0, it indicates that all threads have finished, and the threads waiting on the lock can resume work.

Java concurrent programming actual combat notes, interested in can make up!

Application scenarios

Zookeeper distributed lock, Jmeter simulation of high concurrency, etc

Scenario 1 let multiple threads wait: Simulate concurrency and let concurrent threads execute together

To simulate high concurrency, have a group of threads perform a snap at a specified time (seckill time), wait(countdownlatch.await ()) when they are ready until the seckill time arrives, and then pile in. This is also a simple implementation of local test interface concurrency.

In this scenario, CountDownLatch acts as a starting gun; Just like in track and field race, the athletes will do the preparatory action at the starting line, wait for the sound of the starting gun, the athletes will run hard. Similar to the seckill scenario above.

The code implementation is as follows:

package com.niuh.tools;

import java.util.concurrent.CountDownLatch;

/ * *

  • CountDownLatch sample
  • Scenario 1 let multiple threads wait: Simulate concurrency and let concurrent threads execute together

*/ public class CountDownLatchRunner1 {

public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 5; I++) {new Thread(() -> {try { The athletes are blocking here, waiting for the call to countdownlatch.await (); String parter = "[" + thread.currentThread ().getName() +"] "; System.out.println(parter + "start executing...") ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } Thread.sleep(2000); Countdownlatch.countdown (); countdownlatch.countdown (); // Starting gun: execute command}Copy the code

} 1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.

Running results:

【 THread-2 】 Start executing…… 【 THread-4 】 Start executing…… 【 THread-3 】 Start executing…… 【 thread-0 】 Start executing…… 【 THread-1 】 Start executing…… 1.2.3.4.5.

We use countdownlatch.await () to block and wait for multiple participant threads to start, then call countdownlatch.countdown (1) on the main thread to reduce the count to zero and let all threads execute together; In this way, multiple threads execute concurrently at the same time to simulate the purpose of concurrent requests.

Scenario 2 let a single thread wait: After multiple threads (tasks) complete, merge the totals

A lot of times, we have a contextual dependency on concurrent tasks; For example, the data detail page needs to call multiple interfaces to obtain data at the same time, and the results need to be merged after obtaining data in concurrent requests. Or after multiple data operations are completed, data check is required. It’s all about summarizing and merging scenarios after multiple threads (tasks) have completed.

The code implementation is as follows:

package com.niuh.tools;

import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom;

/ * *

  • CountDownLatch sample
  • Scenario 2 let a single thread wait: After multiple threads (tasks) complete, merge the totals

*/ public class CountDownLatchRunner2 {

public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { final int index = i; new Thread(() -> { try { Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000)); System.out.println("finish" + index + Thread.currentThread().getName()); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } countDownLatch.await(); // The main thread is blocking, and when the counter ==0, it wakes up the main thread to proceed. System.out.println(" main thread: summary of results after all tasks are completed "); }Copy the code

} 1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.

Running results:

Finish3thread-3 Finish0thread-0 Finish1thread-1 Finish4thread-4 Finish2thread-2 Main thread: After all tasks are completed, the results are summarized as 1.2.3.4.5.6.

Add countdownlatch.countdown () to the last line of each thread’s completion to make the counter -1; When all threads finish -1 and the counter drops to 0, the main thread moves down to perform the summary task.

Source code analysis

This article is based on JDK1.8

CountDownLatch class diagram

As you can see from the figure, CountDownLatch is implemented based on the Sync class, which inherits AQS and uses the AQS shared mode.

Its main internal variables and methods are as follows:

When we call awit() and countDown() in our method, several key call relationships occur, as shown below:

Its interaction principle with AQS is as follows:

The constructor

Only one constructor is provided in the CountDownLatch class, with count as the size of the counter

public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException(“count < 0”); this.sync = new Sync(count); }. 2.

If CountDownLatch is reduced to 0, you can no longer use CountDownLatch. You need to create a new latch. This is why this class is not reusable. But you can reuse them, cyclicbarriers.

Internal synchronizer

private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) {setState(count); } int getCount() {return getState(); Protected int tryAcquireShared(int acquires) {return (getState() == 0)? 1:1; } // Release shared resources by reducing state by 1 each time through CAS. signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; 1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27}}}.

The main method

There are three methods in the class that are most important

// The thread calling the await() method is suspended, It will wait until the count value of 0 continue to execute public void await () throws InterruptedException {sync. AcquireSharedInterruptibly (1); }

// Like the await() method, Public Boolean await(long timeout, count = 0) public Boolean await(long timeout, count = 0) TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() {sync.releaseshared (1); } 1.2.3.4.5.6.7.8.9.10.11.12.13.14.

Await () method

// The thread calling the await() method is suspended, It will wait until the count value of 0 continue to execute public void await () throws InterruptedException {sync. AcquireSharedInterruptibly (1); }. 2.

Enter the

AbstractQueuedSynchronizer # acquireSharedInterruptibly () method.

Public final void acquireSharedInterruptibly (int arg) throws InterruptedException {/ / wait for the process shall not interrupt the if (Thread. Interrupted ()) throw new InterruptedException(); / / the tryAcquireShared here in AbstractQueuedSynchronizer does not implement, In Sync described above to realize the if (tryAcquireShared (arg) < 0) doAcquireSharedInterruptibly (arg); } 1.2.3.4.5.6.7.8.9.

#tryAcquireShared() = 1 if AQS state = 0, otherwise keep returning -1, if -1, execute #

DoAcquireSharedInterruptibly (), enter the method

Private void doAcquireSharedInterruptibly (int arg) throws InterruptedException {/ / here is the main thread to join the queue, queue, there are two nodes, the first is the virtual node, Final Node Node = addWaiter(node.shared); boolean failed = true; try { for (;;) Final Node p = node.predecessor(); {// There are only two nodes in total. Int r = tryAcquireShared(arg); if (p == head) {if (p == head) {if (p == head) { if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; }} // If state is not 0, Here will be suspending the main thread blocking the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); 1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29}}.

There’s a magic to using AQS here, because there’s only one main thread in the blocking queue, but as long as the other threads don’t finish, then state is not zero, so the main thread is blocked here, so the question is, who’s going to wake up the main thread? That’s the countDown() method.

Await (long timeout, TimeUnit Unit) method

This method specifies the wait time, and if it is not completed within the specified wait time, it simply returns false, which can be used for subsequent processing in the main thread.

// Like the await() method, Public Boolean await(long timeout, count = 0) public Boolean await(long timeout, count = 0) TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } 1.2.3.4.5.

CountDown () method

Public void countDown() {sync.releaseshared (1); }. 2.

Enter the

AbstractQueuedSynchronizer # releaseShared method

Public final Boolean releaseShared (int arg) {/ / this method is also not implemented in AbstractQueuedSynchronizer, Implement if (tryReleaseShared(arg)) {// Wake up the main thread doReleaseShared(); return true; } return false; } 1.2.3.4.5.6.7.8.9.

TryReleaseShared () is introduced when analyzing the Sync class. This method will decrease the state of AQS by 1. If this operation succeeds, it will wake up the main thread and enter

AbstractQueuedSynchronizer# tryReleaseShared () method

private void doReleaseShared() { for (;;) { Node h = head; if (h ! = null && h ! = tail) { int ws = h.waitStatus; SIGNAL = -1 if (ws == node. SIGNAL) {if (! compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases

If the thread failed to succeed, the second node in the queue, woke up and found that state was still not zero, would again block the unparksucceeded (h); } else if (ws == 0 && ! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }Copy the code

} 1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.

conclusion

CountDownLatch, like Semaphore, is a shared resource problem. The source code implements the TEMPLATE method of AQS, and then uses CAS+ loop retry to implement its own function. In RT multiple resource calls, or perform some operations to rely on other operations to complete can play the role of the counter, xiaobian here also summarized some Internet large companies often interview Java concurrent programming interview real problem 123, interested in the actual combat can come! (Free for private chat)