preface

The directory is as follows:

When discussing concurrency during an interview, many interviewers like to ask questions like:

When N threads complete a task at the same time, how do you know that they are all finished?

This is also one of the topics of this discussion, so this is the second part of “And send the package into the pit refers to the north”; Let’s talk about common concurrency tools.

Their implementation

At the heart of all these questions is how one thread can tell if another thread has finished.

Suppose you have three threads running and need to know their results in the main thread. It can be divided into the following steps:

  • Define a counter of 3.
  • After each thread completes the task, the count is reduced by one.
  • Once the counter drops to zero, the waiting thread is notified.

So it is easy to think of waiting notification mechanism to implement, and the above “and send packets into the pit refers to the north” blocking queue similar.

According to the thinking custom a MultipleThreadCountDownKit tool, the constructor is as follows:

Given the premise of concurrency, this counter naturally needed to be thread-safe, so AtomicInteger was used.

So you need to build objects based on the number of threads during initialization.

Counter minus one

When one of the business threads completes, this counter needs to be reduced by one until it reaches zero.

    /** * Count after thread completes -1 */
    public void countDown(a){

        if (counter.get() <= 0) {return;
        }

        int count = this.counter.decrementAndGet();
        if (count < 0) {throw new RuntimeException("concurrent error"); }if (count == 0) {synchronized(notify){ notify.notify(); }}}Copy the code

DecrementAndGet () ensures atomicity of multiple threads, and notifies other threads by waiting when decrementing to 0.

Wait for all threads to complete

Other threads that need to know that the business thread is finished need to wait until completion, until notified when the counter goes to zero as mentioned above.

    /** * wait for all threads to complete *@throws InterruptedException
     */
    public void await(a) throws InterruptedException {
        synchronized (notify){
            while (counter.get() > 0){
                notify.wait();
            }

            if(notifyListen ! =null){ notifyListen.notifyListen(); }}}Copy the code

The principle is simple: once the counter still exists, the notify object is used to wait until it is woken up by the business thread.

There is also a notification interface that can be customized to implement some of the business logic after the wake up, which will be demonstrated later.

Concurrent test

So basically these two functions, so let’s do a demonstration.

  • A concurrency tool that initializes three countersMultipleThreadCountDownKit
  • Three threads are created to execute the business logic separately and then execute itcountDown().
  • Thread 3 sleeps for 2s to simulate business time.
  • Main thread executionawait()Wait for the three threads to finish executing.

The main thread will wait for the last thread to finish before exiting. The main thread waits for the rest of the threads.

    MultipleThreadCountDownKit multipleThreadKit = new MultipleThreadCountDownKit(3);
    multipleThreadKit.setNotify(() -> LOGGER.info("Three threads completed the task."));
Copy the code

You can also specify a callback interface during initialization to receive notification when the business thread completes execution.

Of course it has the same effect as executing the logic in the main thread (in the same thread as executing the await() method).

CountDownLatch

Of course the code we implemented ourselves had not been validated in a mass production environment, so the main purpose was to try and get a glimpse of the official implementation.

So let’s look at how CountDownLatch is implemented under JUC.

Through the constructor will be found to have an inner class Sync, he is inheriting from AbstractQueuedSynchronizer; This is Java and the basic framework of the package, can be taken separately, so this time the focus is not it, we will focus on the future.

It provides a counter and thread notification tool similar to the one above.

countDown

His core logic is not that different from our own implementation.

    public void countDown(a) {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code

With the inner class’s releaseShared method, we can understand that he wants to decrease the counter by one.

See if there’s any deja vu going on here.

Yes, this is how AtomicInteger decrement is implemented in JDK1.7 (using CAS to ensure thread-safety).

However, once the counter drops to 0, doReleaseShared is executed to wake up the other threads.

Here we only need to care about the red box part (the rest is not concerned for the moment, because queue correlation is involved in AQS), and eventually locksupport. unpark will be called to wake up the thread; This is equivalent to calling Object.notify () above.

So it’s essentially the same thing.

await

The await() is also implemented using Sync object methods.

    public void await(a) throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // Check whether the counter is not complete
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0)?1 : -1;
    }
Copy the code

Once there is unfinished thread, will call doAcquireSharedInterruptibly enter the blocking state.

    private final boolean parkAndCheckInterrupt(a) {
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

And again since this is the method in AQS, we only care about the red box; The locksupport. park method is called, which is equivalent to object.wait().

  • All business threads are called when the counter is reduced to 0 after executionLockSupport.unparkTo wake up the thread.
  • The wait thread takes advantage once the counter > 0LockSupport.parkTo wait for awakening.

The whole process is then strung together and used in a similar way as above.

I won’t talk too much about it.

The actual case

Again, let’s look at a real case.

In the last article “A discussion on the practice of a table trampling” mentioned that in the case of full table scanning, multi-threading is needed to improve the query efficiency.

For example, we divide 64 tables here and plan to use 8 threads to process the data of these tables respectively. The pseudocode is as follows:

CountDownLatch count = new CountDownLatch(64);
ConcurrentHashMap total = new ConcurrentHashMap();
for(Integer i=0; i<=63; i++){ executor.execute(new Runnable(){
		@Override
		public void run(a){ List value = queryTable(i); total.put(value,NULL); count.countDown(); }}); } count.await(); System.out.println("Enquiry completed");
Copy the code

In this way, all data can be queried and then unified summary; The code is simple and easy to understand (you can also use the thread pool API).

conclusion

CountDownLatch is a frequently used tool in JUC, and learning and understanding its use will make it easier to write concurrent applications.

The source code involved in the article:

Github.com/crossoverJi…

Your likes and shares are the biggest support for me