preface

Thread pooling is a very important part of Java, but it is also a very important part of your job interview. It is a very important part of your job interview. It is a very important part of your job interview.

  1. What is the difference between Tomcat’s thread pool implementation and the JDK’s thread pool implementation?
  2. We had a problem with the gateway Dubbo call thread pool: The interface can return normally during the pressure test, but the interface RT is very high. Suppose the core thread size is set to 500, the maximum thread size is set to 800, and the buffer queue is set to 5000. Can you see some problems from this setting and tune these parameters?
  3. Are there really core and non-core threads in a thread pool?
  4. Can new threads be created after the thread pool is shutdown?
  5. The thread must have dropped the task to the thread pool and immediately returned it, right?
  6. Do threads in the thread pool become new threads after they become abnormal, and how do I catch exceptions thrown by these threads?
  7. How to set the size of the thread pool, and how to set the parameters of the thread pool dynamically
  8. What’s the thread pool state machine?
  9. Why doesn’t the Java code specification allow you to quickly create thread pools using Executors?
  10. What problems should be avoided with thread pools? Can you briefly describe best practices for thread pools?
  11. How to gracefully close a thread pool
  12. How can thread pools be monitored

I believe that many people will be confused by these questions

In fact, most of the answers to these questions are hidden in the source code of the thread pool, so in-depth understanding of the source code of the thread pool is very important, this chapter we will learn about the source code of the thread pool, I believe that after reading, most of the above questions can be answered, and some other questions we will discuss with you in the article.

This article will introduce the principle of thread pool from the following aspects.

  1. Why thread pools
  2. How does a thread pool work
  3. There are two ways for thread pools to submit tasks
  4. ThreadPoolExecutor source code analysis
  5. Answer the opening question
  6. Thread pool best practices
  7. conclusion

I believe you will see the understanding of the thread pool will be further, liver text is not easy, do not finish the three link oh.

Why thread pools

As mentioned above, there are three main costs associated with creating a thread:

1. In fact, the thread model in Java is based on the native thread model of the operating system, that is to say, the thread in Java is actually based on the kernel thread. The creation, destruction and synchronization of the thread need to be carried out system call, and the system call needs to switch back and forth between the user mode and the kernel, the cost is relatively high. The life cycle of a thread includes “thread creation time”, “thread execution time”, and “thread destruction time”, both of which are required to cause system calls. Each Thread needs to be supported by a kernel Thread, which means that each Thread needs to consume a certain amount of kernel resources (such as the stack space of the kernel Thread). Therefore, the number of threads that can be created is limited. By default, the stack size of a Thread is 1 M

As shown in the figure, 19535 KB is required to create 19 threads (Thread #19) under Java 8. The reserved means that if 19 threads are created, the operating system guarantees that they will be allocated this amount of space (in fact, they are not necessarily allocated). Committed: Indicates the committed space. Voiceover: Note that this is the footprint of threads under Java 8, but in Java 11 threads have been optimized so that creating a thread only takes about 40 KB, reducing the footprint significantly

3. Too many threads, resulting in a non-negligible context-switching overhead.

Thus, thread creation is expensive, so must be in the form of a thread pool to manage these threads, reasonable setting in the thread pool thread thread size and management, in order to achieve at a reasonable create a thread size in order to achieve maximum profits, the purpose of minimizing risk, for developers, need not care about how to create a thread to complete the task, how to destroy, to cooperation, All you care about is when the submitted task is done, and the fine details of thread tuning, monitoring, and so on are left to the thread pool, so it’s a great relief for developers!

This pooling idea similar to thread pool is applied in many places, such as database connection pool, Http connection pool, etc., avoiding the creation of expensive resources, improving performance, and freeing developers.

ThreadPoolExecutor design architecture diagram

First let’s look at the Executor framework blueprint

  • Executor: The top-level Executor interface provides only a execute interface, which decouples the submission and execution of tasks. This is the core method and the focus of our source code analysis. This method is ultimately implemented by ThreadPoolExecutor.
  • ExecutorService extends the Executor interface, including ways to terminate an Executor, and to submit tasks individually or in batches
  • AbstractExecutorService implements the ExecutorService interface and implements all methods except execute, leaving the most important execute method to ThreadPoolExecutor.

Although such layered design seems to have many layers, each layer is responsible for its own duties, with clear logic and worth learning from.

How does a thread pool work

First let’s look at how to create a thread pool

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10.20.600L,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(4096),
                    new NamedThreadFactory("common-work-thread"));
// Set the rejection policy. The default is AbortPolicy
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
Copy the code

Look at the constructor signature below

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
            // omit some code
}
Copy the code

To understand what these parameters mean, it is important to understand the thread pool submission and execution process, as followsComplete the following steps

1, corePoolSize: If the number of threads is less than the corePoolSize value, threads will be created and assigned to the newly created thread regardless of whether the thread in the thread pool is busy or not. If the number of threads is less than or equal to corePoolSize, these threads will not be recycled. Unless allowCoreThreadTimeOut is set to true, which is generally not the case, since frequently creating destruction threads can greatly increase the overhead of system calls.

WorkQueue: If the number of threads is greater than corePoolSize and smaller than maximumPoolSize, the task will be placed in the blocking queue, and the thread will block the task itself.

3, maximumPoolSize: Maximum number of threads can be created in the thread pool, if submitted task queue is full and the number of threads did not reach the set value, will create a thread and perform the tasks submitted, if the submitted task queue full pool but line number has reached the value, at this point that is beyond the line ChiCheng load capacity, rejection policies should be executed, the understanding, We don’t want to overwhelm the thread pool with a stream of tasks coming in. We need to keep the thread pool working first.

4. RejectedExecutionHandler: There are four rejection policies

  • AbortPolicy: Discards the task and throws an exception, which is the default policy.
  • CallerRunsPolicy: Execute tasks with the caller’s thread, so the initial question “Must the thread drop the task to the thread pool and return it immediately?” With the CallerRunsPolicy policy, the thread submitting the task (say, the main thread) is not guaranteed to return immediately after submitting the task, and has to handle the task itself when the reject policy is triggered.
  • DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
  • DiscardPolicy: Discards tasks without throwing any exceptions. This policy applies only to unimportant tasks.

KeepAliveTime: indicates the thread lifetime. If threads exceeding corePoolSize are idle within this time, they are reclaimed

6, threadFactory: can use this parameter set the name of the thread pool, specify defaultUncaughtExceptionHandler (have much use, later on), you can even set the thread as a daemon thread.

Now the question is how to set these parameters properly.

Let’s start with thread size Settings

<<Java Concurrent Programming in Action >> tells us that there should be two cases

  1. For CPU-intensive tasks, NcpuOn a system of processors, when the thread pool size is Ncpu+ 1, often can achieve optimal utilization, + 1 because occasionally when compute-intensive thread suspension of work due to missing page fault or other reasons, the “extra” thread also ensures that the CPU clock cycle will not be wasted, the so-called CPU intensive, the thread has been busy, so will the thread pool size is set to Ncpu+ 1 avoids context switching and keeps threads busy, maximizing CPU utilization.
  2. It also provides the following calculation formula for IO – intensive tasks

These formulas look good, basic with no actual business scenarios, this formula is too theoretical, from the business scenario, only can be a theoretical reference to, for example, you said cpu-intensive task set the thread pool size to the N + 1, but in fact are often not only in the business set up a thread pool, this kind of situation to paraphrase meng force formula

Let’s look at the size Settings for the workQueue

If the number of core threads is greater than the number of core threads, new threads will be added if and only if the core threads are full and the workQueue is full. If the workQueue is unbounded, no new threads will be added after the number of threads increases to corePoolSize. This means that maximumPoolSize is invalid, the RejectedExecutionHandler policy cannot be triggered, and tasks will only be added to the workQueue until OOM.

So workqueues should be bounded, so that the thread pool can at least work if the task is overloaded, so what are bounded queues and what are unbounded queues?

Bounded queues we use the following two

  • LinkedBlockingQueue: a bounded queue of linkedblockinglist elements arranged in FIFO order, except that the size must be specified at creation time, otherwise it defaults to integer.max_value, which is equivalent to an unbounded queue
  • ArrayBlockingQueue: a bounded queue of array implementations that sorts elements in a first-in, first-out (FIFO) order.

PriorityBlockingQueue Is a common priority queue. Tasks can be assigned a weight to give priority to each task when they are inserted, but this queue is rarely used for the simple reason that tasks in a thread pool are usually executed in an equal order. If certain types of tasks need to be executed first, It is a good practice to separate different task types into separate thread pools.

Why doesn’t the Java code specification allow you to quickly create a thread pool using Executors? , the most common creation methods are the following

The newCachedThreadPool method has the maximum number of threads set to integer. MAX_VALUE, while newSingleThreadExecutor creates workQueue without declaring LinkedBlockingQueue. Creating an unbounded queue will result in OOM if you are not careful.

How to set threadFactory

If a thread pool has a problem, it is important to identify which thread is causing the problem. Therefore, it is necessary to give each thread pool a name

new NamedThreadFactory("demo-work")
Copy the code

Each time a counter is called, it will increment by one. It will be named demo-work-thread-1, demo-work-thread-2. An incrementing string like demo-work-thread-3.

In real business scenarios, it is often difficult to determine the size of corePoolSize, workQueue, and maximumPoolSize. If there is a problem, it is usually necessary to reset these parameters before publishing. This can take some time. This article by Meituan provides an impressive solution. It is a good idea to adjust these parameters dynamically when problems are found (thread pool monitoring alarms), so that these parameters can take effect in real time and be resolved in time when problems are found.

There are two ways for thread pools to submit tasks

Now that the thread pool is created, how do I submit a task to it? There are two methods, execute and submit. Let’s look at the method signatures of these two methods

// Method 1: execute method
public void execute(Runnable command) {}// Method 2: Submit three methods from ExecutorService
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result); Future<? > submit(Runnable task);Copy the code

Execute returns no value. Submit returns a Future, so what does that Future do

public interface Future<V> {

    /** * Cancels the task in progress, or returns false if the task has been executed or canceled, or cannot be canceled for some reason ** Cancels/interruptifrunning */ if the task has not started or is started but can be interrupted (mayInterruptIfRunning is true)
    boolean cancel(boolean mayInterruptIfRunning);

    /** * Whether the task was cancelled before completion */
    boolean isCancelled(a);

    /** * Normal completion of the process, or an exception thrown, or cancellation of the task will return true */
    boolean isDone(a);

    /** * Blocks the execution result of the task */
    V get(a) throws InterruptedException, ExecutionException;

    /** * blocks to wait for the result of the task, but the time is specified. If the task has not completed within the timeout period, * throws a TimeoutException */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code

A Future can be used to cancel a task, determine whether the task has been cancelled/completed, or even block waiting for results.

Submit can submit a Runnable task and return the execution result of the Future

NewTaskFor encapsulates the task as RunnableFuture before the final execution. NewTaskFor returns the FutureTask class, as shown below

As you can see, FutureTask implements both Runnable and Future interfaces, so you can use the Future interface to cancel tasks, obtain task status, and wait for execution results while submitting tasks.

In addition to whether execute and submit can return the execution result, there is another important difference. That is, if an exception occurs during execute execution, it cannot be captured. The ThreadGroup uncaughtException method is executed by default.

So if you want to monitor exceptions that occur when executing the execute method, you need to specify an UncaughtExceptionHandler with threadFactory, which will execute the 1 shown above. The logic in UncaughtExceptionHandler is then executed as follows:

//1. Implement your own thread pool factory
ThreadFactory factory = (Runnable r) -> {
    // Create a thread
    Thread t = new Thread(r);
    // Set the default logic in the UncaughtExceptionHandler object to implement exceptions for the created thread
    t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
        // Set the statistics monitoring logic here
        System.out.println("Thread factory set exceptionHandler" + e.getMessage());
    });
    return t;
};

// create a thread pool of your own and use your own thread factory
ExecutorService service = new ThreadPoolExecutor(1.1.0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),factory);

//3. Submit the task
service.execute(()->{
    int i=1/0;
});
Copy the code

Perform the above logic will output “thread factory Settings exceptionHandler/by zero”, in this way can by setting defaultUncaughtExceptionHandler to carry out our monitoring logic.

If we use Submit, how do we catch exceptions when we call future.get

Callable testCallable = xxx;
Future future = executor.submit(myCallable);
try {
    future1.get(3));
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
Copy the code

So why does the Future catch asynchrony only on GET? Because the exception is saved after being thrown on submit and only thrown on GET

This article about the execution process of execute and submit why god is very thorough, I will not pick up the wisdom of others, suggest that you have a good taste, the harvest will be great!

ThreadPoolExecutor source code analysis

The front of the foreshadowing so much, finally to the core of the source code analysis.

For a thread pool, the most important thing we care about is its “state” and “number of runnable threads”. Normally, we can choose two variables to record, but Doug Lea used only one variable (CTL) to achieve this goal. We know that the more variables, the less maintainable the code. It’s also more buggy, so it’s two variables with one variable, which makes the code much more maintainable. So how does it design it

// ThreadPoolExecutor.java
public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    / / the result: 111 00000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    / / the result: 000 00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    / / the result: 001 00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    / / the result: 010 00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    / / the result: 011 00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Get the state of the thread pool
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // Get the number of threads
    private static int workerCountOf(int c)  { returnc & CAPACITY; }}Copy the code

As you can see, CTL is an Integer variable of the atomic class. It has 32 bits. The lower 29 bits represent the number of threads, and the maximum 29 bits represent (2^29)-1 (approximately 500 million). You can declare a CTL as an AtomicLong, with the high three bits representing the state of the thread pool and the high three bits representing the state of the eight thread pools. Since there are only five states in a thread pool, three bits are sufficient

  • RUNNING: Receives new tasks and can continue processing tasks in the workQueue
  • SHUTDOWN: No new tasks are received, but tasks in the workQueue continue to be processed
  • STOP: No new tasks are received, no tasks in the workQueue are processed, and the thread that is processing the task is interrupted
  • TIDYING: All tasks are terminated, and the state is terminated when the workCount is 0. The hook method terminated() is called to state terminated
  • TERMINATED: This is the state after the method TERMINATED () is called

The thread pool state flow and trigger conditions are as follows

With that in mind, let’s analyze the execute source code


public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // If the current number of threads is less than the number of core threads (corePoolSize), regardless of whether the core thread is busy, the thread is created until corePoolSize is reached
    if (workerCountOf(c) < corePoolSize) {
        // create a thread and assign the task to the worker (in this case, the task is the firstTask in the worker)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // If the thread pool is RUNNING and the number of threads is greater than corePoolSize or
    If the number of threads is less than corePoolSize and the thread fails to be created, the task is added to the workQueue
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // We need to check whether the thread pool is in the RUNNING state again, because the thread pool state may change after the task is queued (for example, the shutdown method is called, etc.). If the thread state changes, we will remove the task and execute the reject policy
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // If the pool is RUNNING and the number of threads is 0, the new thread accelerates the processing of tasks in the workQueue
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    If the number of threads exceeds corePoolSize and the task fails to be enqueued, the maximum number of threads (maximumPoolSize) will be used to create the thread. If this fails, the number of threads exceeds maximumPoolSize and the policy will be rejected
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

It can be seen from this code that thread creation is realized by calling addWorker. Before analyzing addWorker, it is necessary to briefly mention Worker. Thread pool encapsulates every thread that executes a task into the form of Worker, which is named Worker vividly. The essence of a thread pool is a producer-consumer model, where producers are constantly throwing tasks into a workQueue, which is like an assembly line, and workers are constantly fetching tasks to execute

The question is, why wrap a thread into a worker? If the thread pool gets a task, it can be thrown to the thread or the thread can process it in the workQueue.

The main purpose of encapsulating threads as workers is to better manage thread interrupts

Take a look at the definition of Worker

// It can be seen that the worker is both a Runnable task and also implements AQS (in fact, AQS is used to implement an exclusive lock, so since the worker will be locked when running, execute shutdown, setCorePoolSize, SetMaximumPoolSize and other methods will try to interrupt the thread. In this interrupt method, the worker will try to acquire the lock of the worker. If the lock fails, the worker is running. At this point, the worker will finish the task first and then close the worker's thread to achieve the purpose of gracefully closing the thread.)
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        // The thread that actually executes the task
        final Thread thread;
        // As mentioned above, if the current number of threads is less than the number of core threads, create a thread and send the submitted task to the worker, then firstTask is the submitted task. If the worker fetches the task from the workQueue, firstTask is empty
        Runnable firstTask;
        // Count the completed tasks
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            // Initialize to -1 so that interrupts are not allowed until the thread runs (calling runWorker). GetState ()>=0 is determined in the interruptIfStarted() method
            setState(-1); 
            this.firstTask = firstTask;

            // Create a thread from the threadFactory of the thread pool and pass the worker to the thread (since the worker implements the Runnable interface)
            this.thread = getThreadFactory().newThread(this);
        }

        public void run(a) {
            // This method is called when thread starts
            runWorker(this);
        }

       
        // 1 means locked, 0 means unlocked
        protected boolean isHeldExclusively(a) {
            returngetState() ! =0;
        }

        // Try to get the lock
        protected boolean tryAcquire(int unused) {
            // It can be seen from here that it is an exclusive lock, because it is impossible for cas to set state successfully after acquiring the lock. Here we can also understand the function of setting state to -1 above. In this case, it is impossible to obtain the lock, and the worker must first obtain the lock to be interrupted
            if (compareAndSetState(0.1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Try to release the lock
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }    

        public void lock(a)        { acquire(1); }
        public boolean tryLock(a)  { return tryAcquire(1); }
        public void unlock(a)      { release(1); }
        public boolean isLocked(a) { return isHeldExclusively(); }
            
        This method will be called by shutdowNow. As you can see, shutdownNow does not need to acquire a lock to interrupt a thread, which means that if the thread is running, it will interrupt you anyway. So generally we don't use shutdowNow to interrupt a thread. The thread may be executing a task when the interrupt occurs, which affects the task execution
        void interruptIfStarted(a) {
            Thread t;
            // State >= 0 and t! = null and the thread is not interrupted
            // If state == -1, no interrupt is performed
            if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
Copy the code

Based on the above analysis of Worker class, it is not difficult for you to understand that the main purpose of encapsulating threads as workers is to better manage the interruption of threads.

With the meaning of Worker understood, let’s look at the method of addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();

        // Get the state of the thread pool
        int rs = runStateOf(c);

        // If the thread pool is TERMINATED at state >= SHUTDOWN, TIDYING, and TERMINATED at state, it is possible to create a thread in only one case: the thread state is SHUTDOWN and the queue is not empty. FirstTask == null creates a thread that does not receive new tasks (the thread will retrieve the task from the workQueue before executing it). In this case, the thread is created to speed up processing of the tasks in the workQueue
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        for (;;) {
            // Get the number of threads
            int wc = workerCountOf(c);
            // If the CAPACITY of the thread pool exceeds the maximum CAPACITY of the thread pool (500 million, almost impossible)
            // Or exceeds corePoolSize (core true) or maximumPoolSize (core false)
            // Returns false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // Otherwise CAS increases the number of threads, if successfully out of the double loop
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl

            // If the thread's running state changes, skip to the outer loop to continue execution
            if(runStateOf(c) ! = rs)continue retry;
            // The CAS fails to increase the number of threads. Continue the retry inner loop}}boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Create a worker and add it to the thread pool to execute tasks
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            Workers are HashSet, not thread safe, so they need to be locked to ensure
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Check the state of the thread pool again in case there is an interrupt
                int rs = runStateOf(ctl.get());
                // If the thread pool state is less than SHUTDOWN (RUNNING),
                // If the state is SHUTDOWN but firstTask == null (no task is received, only a thread is created to process tasks in the workQueue), the conditions for adding workers are met
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                                        // If the thread has been started, there is obviously a problem (because the thread has not been started after creating the worker), throw an exception
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();

                    // Record the maximum thread pool size for monitoring purposes
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }

            If the worker is successfully added to workers, the thread is started
            if (workerAdded) {
                t.start();
                workerStarted = true; }}}finally {
        The addWorkerFailed method is executed, which mainly removes worker from workers, reduces the number of threads, and tries to close the thread pool
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

We can see from this code multithreading under the circumstance of unpredictable, we found that under the condition of meet the conditions, and to check the thread state, to prevent disruptions during state operation of the change of thread pool, it also gives us inspiration: multithreaded environment must consider a variety of critical conditions in place.

After the addWorker is executed successfully, the thread starts to execute (t.start()). Since the worker itself is passed to the thread when the worker is created, the run method of the worker will be called after the thread is started

public void run(a) {
    runWorker(this);
}
Copy the code

As you can see, the runWorker method is eventually called, so let’s examine the runWorker method

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // Unlock calls tryRelease to set state to 0 to allow interrupts. State >= 0 is allowed
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // If a thread is created when a task is submitted and the task is thrown to the thread, the task is executed first
        // Otherwise get the task from the task queue to execute (getTask() method)
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();
            
            // If the thread pool state is >= STOP (STOP, TIDYING, TERMINATED), the thread should terminate
            // If the Thread pool status < STOP, the Thread should not be interrupted. If interrupted(Thread.interrupted() returns true, Again, the thread pool state is determined (in case shutdownNow() is executed when the flag bit is cleared). If the thread pool is stopped, the execution thread is interrupted
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                // Subclasses can implement this hook method for statistics purposes before executing the task
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // After executing the task, subclasses can implement this hook method for statistical purposesafterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // If a thread fails or exits normally, the number of threads in the queue will be -1processWorkerExit(w, completedAbruptly); }}Copy the code

Let’s see what the processWorkerExit method looks like

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // Cas performs the thread pool decrement by 1 if an exception exits
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // Lock the thread to safely remove the worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // If woker exits unexpectedly, the state of the thread pool may have changed (e.g., shutdown, etc.)
    tryTerminate();

    int c = ctl.get();

    If woker exits abnormally, create a new woker. If wokerQueue exits normally, ensure that at least one thread is running to execute tasks in wokerQueue if wokerQueue is not empty
    if (runStateLessThan(c, STOP)) {
        if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0&&! workQueue.isEmpty()) min =1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null.false); }}Copy the code

Next, let’s examine the getTask method that Woker uses to fetch tasks from the workQueue

private Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // If the thread pool state is at least STOP or
        // Thread pool state == SHUTDOWN and the task queue is empty
        // The number of threads is reduced and null is returned, in which case the runWorker analyzed above will execute processWorkerExit to make the woker that acquired the Task quit
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // If allowCoreThreadTimeOut is true, any thread that is idle for keepAliveTime will be collected. If the number of threads is greater than corePoolSize, If it is idle for keepAliveTime, it is reclaimed
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // There are several conditions in which workers should be reclaimed
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
           // block task acquisition. If the task is not obtained within keepAliveTime, it timedOut and timedOut is true
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

After the above source code analysis, I believe that we understand the working principle of the thread pool is nine or nine out of ten, and then a brief look at some other useful methods, we mentioned the thread pool monitoring problem at the beginning, we look at what indicators can be monitored

  • Int getCorePoolSize() : Gets the number of core threads.
  • Int getLargestPoolSize() : indicates the historical peak number of threads.
  • Int getMaximumPoolSize() : maximum number of threads (thread pool thread capacity)
  • Int getActiveCount() : the number of active threads
  • Int getPoolSize() : the total number of threads in the current thread pool
  • BlockingQueue getQueue() Specifies the queue of tasks in the current thread pool to get the total number of backlogged tasks. Getqueue.size ()

Monitoring approach is simple, start a timer thread ScheduledThreadPoolExecutor, on the thread pool index collection on a regular basis, typically using open source tools such as Prometheus Grafana + + MicroMeter.

How do I warm up the core thread pool

Using the prestartAllCoreThreads() method, this method creates the corePoolSize threads at once, without waiting until the task is submitted. Once the submitted thread is created, the threads can process the task as soon as it is submitted.

How to dynamically adjust thread pool parameters

  • SetCorePoolSize (int corePoolSize) adjusts the core thread pool size
  • setMaximumPoolSize(int maximumPoolSize)
  • SetKeepAliveTime () sets the thread lifetime

Answer the opening question

What is the difference between Tomcat’s thread pool implementation and the JDK’s thread pool implementation? There’s something called EagerThreadPool in Dubbo, and you can see how it works

As you can see from the comment, if the core threads are busy and new requests come in, EagerThreadPool will choose to create the thread first rather than put it on the task queue, so that it can respond to those requests more quickly.

The Tomcat implementation is similar, with a slight difference. When Tomcat is started, minSpareThreads are created first. If these threads are busy while the request is received over a period of time, the minSpareThreads are created each time at the minSpareThreads step. It’s also about responding to processing requests more quickly. The source code can be found in its ThreadPool implementation, which is not expanded here.

2. There was a problem in our gateway Dubbo call thread pool: the interface could return normally during the pressure test, but the interface RT was very high. Suppose the core thread size was set to 500, the maximum thread was set to 800, and the buffer queue was set to 5000. This parameter clearly shows the problem. First, the task queue is set too large. After the task reaches the core thread, if there are any more requests, it will enter the task queue first, and the thread will be created after the queue is full. And call prestartAllCoreThreads() to warm up the core thread without waiting for the request to create the thread.

A few best practices for thread pools

Tasks performed by thread pools should be independent of each other, which can lead to deadlocks if they depend on each other, as in the following code

ExecutorService pool = Executors
  .newSingleThreadExecutor();
pool.submit(() -> {
  try {
    String qq=pool.submit(()->"QQ").get();
    System.out.println(qq);
  } catch (Exception e) {
  }
});
Copy the code

2. Core and non-core tasks are best separated by multiple thread pools

There was such a failure in our business: All of a sudden, many users gave feedback that they could not receive short messages. After investigation, it was found that sending short messages was in a thread pool, and another timing script was also used to execute tasks in this thread pool. This script may generate hundreds or thousands of tasks in a minute, so that the method of sending short messages could not be executed in the thread pool. We later solved the problem by using two thread pools to separate SMS from script execution.

3. Add thread pool monitoring and dynamically set the thread pool

As stated earlier, all parameters of the thread pool are difficult to determine once and for all, now that is difficult to determine, and to ensure timely after found the problem solved, we need to increase monitoring thread pool, monitor the queue size, number of threads, etc., we can set up such as queues task 3 minutes has been full, will trigger the alarm, so that we can early warning, If the online thread pool parameters are not properly set and degrade operations are triggered, you can dynamically set the thread pool to change the number of core threads and the maximum number of threads in real time to fix the problem in time.

conclusion

This article analyzes in detail the working principle of the thread pool, believe that everyone on the working mechanism should have deeper understanding, also for the first few problems have a clear understanding of, the purpose of setting the thread pool, in essence, is to make use of effective resources to maximize performance, minimize risk, at the same time the use of thread pool is essentially in order to better service for the user, Therefore, it is not surprising that Tomcat, Dubbo will have to set up its own thread pool.

Finally ask for attention

Shoulders of giants

  • Dzone.com/articles/ho…
  • Segmentfault.com/a/119000002…
  • www.cnblogs.com/trust-freed…
  • In-depth understanding of thread pools tinyurl.com/y675j928
  • Some threads it is dead, so it becomes an interview mp.weixin.qq.com/s/wrTVGLDvh…
  • Java concurrent programming practice
  • Java thread pool implementation principle and its practice in Meituan business: mp.weixin.qq.com/s/baYuX8aCw…
  • Thread pool exception handling details, one to understand! www.cnblogs.com/ncy1/articl…