preface
The directory is as follows:
When discussing concurrency during an interview, many interviewers like to ask questions like:
When N threads complete a task at the same time, how do you know that they are all finished?
This is also one of the topics of this discussion, so this is the second part of “And send the package into the pit refers to the north”; Let’s talk about common concurrency tools.
Their implementation
At the heart of all these questions is how one thread can tell if another thread has finished.
Suppose you have three threads running and need to know their results in the main thread. It can be divided into the following steps:
- Define a counter of 3.
- After each thread completes the task, the count is reduced by one.
- Once the counter drops to zero, the waiting thread is notified.
So it is easy to think of waiting notification mechanism to implement, and the above “and send packets into the pit refers to the north” blocking queue similar.
According to the thinking custom a MultipleThreadCountDownKit tool, the constructor is as follows:
Given the premise of concurrency, this counter naturally needed to be thread-safe, so AtomicInteger was used.
So you need to build objects based on the number of threads during initialization.
Counter minus one
When one of the business threads completes, this counter needs to be reduced by one until it reaches zero.
/** * Count after thread completes -1 */
public void countDown(a){
if (counter.get() <= 0) {return;
}
int count = this.counter.decrementAndGet();
if (count < 0) {throw new RuntimeException("concurrent error"); }if (count == 0) {synchronized(notify){ notify.notify(); }}}Copy the code
DecrementAndGet () ensures atomicity of multiple threads, and notifies other threads by waiting when decrementing to 0.
Wait for all threads to complete
Other threads that need to know that the business thread is finished need to wait until completion, until notified when the counter goes to zero as mentioned above.
/** * wait for all threads to complete *@throws InterruptedException
*/
public void await(a) throws InterruptedException {
synchronized (notify){
while (counter.get() > 0){
notify.wait();
}
if(notifyListen ! =null){ notifyListen.notifyListen(); }}}Copy the code
The principle is simple: once the counter still exists, the notify object is used to wait until it is woken up by the business thread.
There is also a notification interface that can be customized to implement some of the business logic after the wake up, which will be demonstrated later.
Concurrent test
So basically these two functions, so let’s do a demonstration.
- A concurrency tool that initializes three counters
MultipleThreadCountDownKit
- Three threads are created to execute the business logic separately and then execute it
countDown()
. - Thread 3 sleeps for 2s to simulate business time.
- Main thread execution
await()
Wait for the three threads to finish executing.
The main thread will wait for the last thread to finish before exiting. The main thread waits for the rest of the threads.
MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3);
multipleThreadKit.setNotify(() -> LOGGER.info("Three threads completed the task."));
Copy the code
You can also specify a callback interface during initialization to receive notification when the business thread completes execution.
Of course it has the same effect as executing the logic in the main thread (in the same thread as executing the await() method).
CountDownLatch
Of course the code we implemented ourselves had not been validated in a mass production environment, so the main purpose was to try and get a glimpse of the official implementation.
So let’s look at how CountDownLatch is implemented under JUC.
Through the constructor will be found to have an inner class Sync, he is inheriting from AbstractQueuedSynchronizer; This is Java and the basic framework of the package, can be taken separately, so this time the focus is not it, we will focus on the future.
It provides a counter and thread notification tool similar to the one above.
countDown
His core logic is not that different from our own implementation.
public void countDown(a) {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
With the inner class’s releaseShared method, we can understand that he wants to decrease the counter by one.
See if there’s any deja vu going on here.
Yes, this is how AtomicInteger decrement is implemented in JDK1.7 (using CAS to ensure thread-safety).
However, once the counter drops to 0, doReleaseShared is executed to wake up the other threads.
Here we only need to care about the red box part (the rest is not concerned for the moment, because queue correlation is involved in AQS), and eventually locksupport. unpark will be called to wake up the thread; This is equivalent to calling Object.notify () above.
So it’s essentially the same thing.
await
The await() is also implemented using Sync object methods.
public void await(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// Check whether the counter is not complete
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0)?1 : -1;
}
Copy the code
Once there is unfinished thread, will call doAcquireSharedInterruptibly enter the blocking state.
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
And again since this is the method in AQS, we only care about the red box; The locksupport. park method is called, which is equivalent to object.wait().
- All business threads are called when the counter is reduced to 0 after execution
LockSupport.unpark
To wake up the thread. - The wait thread takes advantage once the counter > 0
LockSupport.park
To wait for awakening.
The whole process is then strung together and used in a similar way as above.
I won’t talk too much about it.
The actual case
Again, let’s look at a real case.
In the last article “A discussion on the practice of a table trampling” mentioned that in the case of full table scanning, multi-threading is needed to improve the query efficiency.
For example, we divide 64 tables here and plan to use 8 threads to process the data of these tables respectively. The pseudocode is as follows:
CountDownLatch count = new CountDownLatch(64);
ConcurrentHashMap total = new ConcurrentHashMap();
for(Integer i=0; i<=63; i++){ executor.execute(new Runnable(){
@Override
public void run(a){ List value = queryTable(i); total.put(value,NULL); count.countDown(); }}); } count.await(); System.out.println("Enquiry completed");
Copy the code
In this way, all data can be queried and then unified summary; The code is simple and easy to understand (you can also use the thread pool API).
conclusion
CountDownLatch is a frequently used tool in JUC, and learning and understanding its use will make it easier to write concurrent applications.
The source code involved in the article:
Github.com/crossoverJi…
Your likes and shares are the biggest support for me