1. Introduction to thread pools

In Web development, the server needs to accept and process requests, so each request is allocated a thread to process. This is easy to implement if you create a new thread for each request, but there is a problem:

If the number of concurrent requests is very large, but the execution time of each thread is very short, threads will be created and destroyed frequently, which will greatly reduce the efficiency of the system. It may occur that the server spends more time and consumes more system resources creating and destroying new threads for each request than it does processing the actual user request.

Is there a way to finish a task without being destroyed, and then move on to other tasks?

That’s what thread pools are for. Thread pools provide a solution to the overhead and under-resourcing problems of the thread life cycle. By reusing threads for multiple tasks, the overhead of thread creation is spread over multiple tasks.

When to use thread pools?

  • The processing time of a single task is short
  • The number of tasks to handle is large

Benefits of using thread pools:

  1. Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.
  2. Improve response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created.
  3. Improve thread manageability. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for unified allocation, tuning, and monitoring.

Thread pools in Java are implemented using the ThreadPoolExecutor class. This article combines JDK 1.8 to this class source code to analyze this class internal for the creation of threads, management and scheduling background tasks and other aspects of the execution principle.

Let’s take a look at the thread pool class diagram:

2. Source code analysis

1. Executor Framework interface

The Executor framework is a framework for invoking, scheduling, executing, and controlling asynchronous tasks based on a set of execution policies. The purpose is to provide a mechanism for separating task submission from how the task works.

There are three Executor interfaces in J.U.C:

  • Executor: A simple interface to run new tasks;
  • ExecutorService: Extends the Executor interface. Added some methods to manage the executor lifecycle and task lifecycle;
  • ScheduledExecutorService: extends the ExecutorService. Support for Future and periodic execution of tasks.

1. Executo interface

public interface Executor {
    void execute(Runnable command);
}
Copy the code

The Executor interface has only one execute method, which is used instead of the usual method of creating or starting a thread. For example, the code to create and start a Thread using Thread looks like this:

Thread t = new Thread();
t.start();
Copy the code

The code for starting a thread to execute a task using Executor is as follows:

Thread t = new Thread();
executor.execute(t);
Copy the code

For different Executor implementation, the execute () method might be to create a new thread and immediately start, it is possible that the use of the existing worker threads to run into tasks, can also be based on setting the thread pool capacity or the capacity of the blocking queue to decide whether to put the incoming thread in blocking the queue or refuse to receive the incoming thread.

2. ExecutorService

ExecutorService This interface is inherited from the Executor interface.

Methods are provided to manage terminations and to generate futures for tracking the execution of one or more asynchronous tasks. Add shutDown(), shutDownNow(), invokeAll(), invokeAny(), and submit() methods. If you need to support instant shutdown, the shutDownNow() method, the task needs to handle the interruption correctly. The main methods are as follows:

3. AbstractExecutorService

The abstract method is implemented inExecutorServiceThe method is mainly rewrittenExecutorServiceSome of the methods in theSubmit (), invokeAll ()And so on. At the same time added a few own methods:

4. ThreadPoolExecutor

Next, we’ll focus on the important thread pool implementation class, which inherits AbstractExecutorService.

1. A few important fields
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

Here’s what each field means:

  1. CTL is a field that controls the running state of the thread pool and the number of valid threads in the thread pool. It contains two parts of information: the running state of the thread pool (runState) and the number of valid threads in the thread pool (workerCount)
  2. The Integer type is used for saving, with runState held at the top 3 bits and workerCount held at the bottom 29 bits. COUNT_BITS is 29, and CAPACITY is 1 shifted 29 bits to the left minus 1 (29 1s). This constant represents the upper limit of workerCount, which is about 500 million.
  3. RUNNING: can accept newly submitted tasks and also process tasks in a blocking queue;
  4. SHUTDOWN: A closed state that does not accept newly submitted tasks, but can continue to process the saved tasks in the blocking queue. When the thread pool is in the RUNNING state, calling shutdown() brings it to that state.
  5. STOP: Cannot accept new tasks and does not process tasks in the queue, interrupting the thread that is processing the task. When a thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow() method brings the pool to that state.
  6. TIDYINGIf all tasks terminated and workerCount is 0, the thread pool terminated by calling the terminated() method.
  7. TERMINATED: enters the state after the terminated() method is terminated. By default, nothing is done in the terminated() method.

Entry into TERMINATED conditions is as follows:

  • The thread pool is not RUNNING;
  • The thread pool state is not TIDYING or TERMINATED;
  • If the thread pool state is SHUTDOWN and the workerQueue is empty;
  • WorkerCount 0;
  • Setting the TIDYING state succeeded.

Here is the thread pool state flow:Ctl-related methods There are several other methods for calculating CTLS:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

RunStateOf: Obtains the running status. WorkerCountOf: Gets the number of active threads; CtlOf: Gets the value of the health status and number of active threads.

ThreadPoolExecutor constructor
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

The fields in the constructor have the following meanings:

  • CorePoolSize: the number of core threads that perform the following judgments when a new task is submitted in the execute() method:

    1. If fewer threads are running than corePoolSize, a new thread is created to process the task, even if the other threads in the thread pool are idle;
    2. If the number of threads in the thread pool is greater than or equal to corePoolSize and smaller than maximumPoolSize, the task is first queued, and only when the workQueue is full will a new thread be created to process the task.
    3. If corePoolSize is set to the same as maximumPoolSize, the size of the created thread pool is fixed. If a new task is submitted, the request will be added to the workQueue if the workQueue is not full. Wait for an idle thread to fetch a task from the workQueue and process it.
    4. If the number of threads running is greater than or equal to maximumPoolSize, then if the workQueue is full, the task is processed using the policy specified by the handler.

    Therefore, the task submission is determined in the order corePoolSize > workQueue > maximumPoolSize.

  • MaximumPoolSize: specifies the maximum number of threads.

  • WorkQueue: wait queue. When a task is submitted, if the number of threads in the thread pool is greater than or equal to corePoolSize, the task is encapsulated as a Worker object and placed in the wait queue.

  • WorkQueue: a block queue that stores tasks that are waiting to be executed. When a new task is submitted to the thread pool, the thread pool determines how to process the task based on the number of threads running in the current thread pool.

    1. Direct switchover: The common queue for this is SynchronousQueue
    2. Use unbounded queues: Generally use LinkedBlockingQueue, a list-based blocking queue. If you use this approach, the maximum number of threads that can be created in the thread pool is corePoolSize, and maximumPoolSize will not work (as discussed later). When all the core threads in the thread pool are in the RUNNING state, a new task submission is put into the wait queue.
    3. Use bounded queues: ArrayBlockingQueue is generally used. Use this way to limit the maximum number of threads thread pool for maximumPoolSize, so can reduce the consumption of resources, but at the same time, this way also makes the thread pool to thread scheduling becomes more difficult, because the thread pool and the capacity of the queue is a finite value, so if you want to make a thread pool processing throughput to achieve a relatively reasonable range, If you want to make thread scheduling relatively simple and minimize the resource consumption of the thread pool, you need to set these two numbers properly.
    • If you want to reduce the consumption of system resources (including CPU usage, operating system resource consumption, context switching overhead, etc.), you can set a larger queue capacity and a smaller thread pool capacity, but this will also reduce the throughput of thread processing tasks.
    • If submitted tasks frequently block, consider resetting the capacity of the thread pool by calling the setMaximumPoolSize() method.
    • If the capacity of the queue is set to be small, it is usually necessary to set the capacity of the thread pool to be large so that the CPU usage is relatively high. However, if the capacity of the thread pool is set too high, concurrency will increase if the number of submitted tasks is too high, and scheduling between threads is an issue to consider because it can reduce the throughput of processing tasks.
  • KeepAliveTime: The idle time allowed by the thread pool to maintain threads. When the number of threads in the thread pool is greater than corePoolSize, threads outside the core thread will not be destroyed immediately if no new task is submitted. Instead, they will wait until the keepAliveTime is exceeded.

  • ThreadFactory: This is a variable of type threadFactory used to create a new thread. Default Executors. DefaultThreadFactory () to create a thread. When a thread is created using the default ThreadFactory, the new thread is created with the same NORM_PRIORITY priority and is non-daemon, and the thread name is set.

  • Handler: This variable is of type RejectedExecutionHandler and represents the saturation policy of the thread pool. If the blocking queue is full and there are no free threads, then you need to adopt a policy to handle the task if you continue to submit it. Thread pools provide four policies:

    1. AbortPolicy: Throws an exception directly. This is the default policy.
    2. CallerRunsPolicy: Executes the task with the caller’s thread;
    3. DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
    4. DiscardPolicy: Directly discards the task.
3. execute()

Execute () is used to submit tasks. The source code is as follows:

Public void execute(Runnable command) {if (command == null) throw new NullPointerException(); // CLT record runState and workerCount int c = ctl.get(); /** * The workerCountOf method returns a value 29 bits lower than the current number of active threads. * If the number of active threads is smaller than corePoolSize, a new thread is added to the thread pool; And put the task into the thread pool * If adding fails, */ if (workerCountOf(c) < corePoolSize) {/** * The second parameter in addWorker indicates whether the number of threads added is limited by corePoolSize or maximumPoolSize; * If true, judge by corePoolSize; */ if (addWorker(command, true)) return; c = ctl.get(); } /** * Second case: If the current thread pool is running and the task can be added to the queue * retrieve the CTL value to determine the status, * */ if (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); /** * If (!) {/** * if (!) {/** * if (!) {/** * if (!) {/** * if (! isRunning(recheck) && remove(command)) reject(command); /** * gets the number of valid threads in the thread pool. If the number is 0, execute the addWorker method. The first argument is null, indicating that a thread is created in the thread pool but not started. * 2. If the second parameter is false, set the maximum number of threads in the pool to maximumPoolSize. * If workerCount is greater than 0, the new command will be executed at some point in the future. */ else if (workerCountOf(recheck) == 0) addWorker(null, false); } /** * The third case: you need to execute the reject policy * If you execute at this point, there are two cases: * 1. The thread pool is no longer RUNNING. WorkerCount >= corePoolSize and workQueue is full. * The addWorker method is called again, but the second argument is passed false, setting the upper limit of the limited number of threads in the thread pool to maximumPoolSize; */ else if (! addWorker(command, false)) reject(command); }Copy the code

Simply put, if the state of execute() is RUNNING, the execution process is as follows:

  1. ifworkerCount < corePoolSize, a thread is created and started to execute the newly submitted task;
  2. ifworkerCount >= corePoolSize, and the blocking queue in the thread pool is not full, then the task is added to the blocking queue;
  3. ifworkerCount >= corePoolSize && workerCount < maximumPoolSize, and the blocking queue in the thread pool is full, a thread is created and started to execute the newly submitted task;
  4. ifworkerCount >= maximumPoolSize, and the blocking queue in the thread pool is full, the task is processed according to the reject policy. The default processing method is to throw an exception directly.

So let’s be careful hereaddWorker(null, false)That is, a thread is created, but no task is passed in because the task has already been added toworkQueueI got it, soworkerWhen executed, it will be directly fromworkQueueTo get the task. So, inworkerCountOf(recheck) == 0When performingaddWorker(null, false); In order to ensure that the thread pool in the RUNNING state must have a thread to perform the task. The execution flow of the execute method is as follows:

4. addWorker()

In the submission thread, you can see that the thread is added to the thread pool through the addWorker() method, so let’s examine this method:

/** * @param firstTask * @param core true if corePoolSize is used @return */ private Boolean addWorker(Runnable firstTask, Boolean core) {retry: // loop for (;;) Int c = ctl.get(); int rs = runStateOf(c); * 1. If rs >= SHUTDOWN, no new task is received. * 2. Return false if one of the following conditions is not met: * 1. The state is SHUTDOWN, which means that the state is closed and no new submitted tasks can be accepted, but the saved tasks in the blocking queue can continue to be processed. The blocking queue is not empty * first consider rs == SHUTDOWN * in which case no new submitted task will be accepted, so return false if firstTask is not empty; * Then, if firstTask is empty and workQueue is empty, return false, * because there are no more tasks in the queue and no more threads need to be added */ if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // for (;;) Int wc = workerCountOf(c); /** * If wc exceeds CAPACITY, which is the maximum of the lower 29 bits of the CTL (binary is 29 ones), the method returns false; * Core is the second argument to the addWorker method, which is true to compare against corePoolSize or false to compare against maximumPoolSize. */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; / / try to increase workerCount, if successful, the jump out of the first for loop if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); c = ctl.get(); // If the current running state is not rs, the state has been changed, return the first for loop and continue with if (runStateOf(c)! = rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; W = new Worker(firstTask); w = new Worker(firstTask); final Thread t = w.thread; /** * If the thread is not empty, try to add the thread to the work queue */ if (t! = null) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Int rs = runStateOf(ctl.get()); /** * rs < SHUTDOWN; * If RS is RUNNING or RS is SHUTDOWN and firstTask is null, add threads to the thread pool. * Since no new tasks are being added during SHUTDOWN, But will carry out the task of workQueue * / if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {if (t.i sAlive ()) throw new IllegalThreadStateException(); Worker.add (w); worker.add (w); worker.add (w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; if (s > largestPoolSize) largestPoolSize = s; // Update status workerAdded = true; } } finally { mainLock.unlock(); } /** * if (workerAdded) {t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

Notice the t.start() statement here. When started, the run() method in the Worker class is called. The Worker itself implements the Runnable interface, so an object of Worker type is also a thread.

5. Worker

Each thread in the ThreadPool is encapsulated into a Worker object. ThreadPool maintains a group of Worker objects. See the definition of Worker:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** * The thread created by ThreadFactory when the constructor is called is the thread used to process the task. */ final Thread thread; /** Save the incoming task */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * When the constructor is called, the task needs to be passed in, in this case through getThreadFactory().newThread(this); To create a newThread, the * newThread method passes in the parameter this. Since the Worker itself inherits the Runnable interface, * is also a thread, so a Worker object will call the run method in the Worker class when it is started. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Public void run() {runWorker(this); } // Lock methods // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() ! = 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

Worker inherits AQS and uses AQS to realize the function of exclusive lock. Why not use ReentrantLock? You can see the tryAcquire method, which does not allow reentrant, whereas ReentrantLock does:

  1. lockOnce the method acquires an exclusive lock, it indicates that the current thread is executing a task.
  2. If a task is being executed, the thread should not be interrupted;
  3. If the thread is not in the exclusive lock state, that is, the idle state, it is not processing the task, then you can interrupt the thread.
  4. Thread pool executingshutdownMethod ortryTerminateMethod is calledinterruptIdleWorkersMethod to interrupt idle threads,interruptIdleWorkersThe tryLock method is used to determine whether a thread in the thread pool is idle.
  5. It is set to non-reentrant because we do not want the task to be calling the imagesetCorePoolSizeSuch a thread pool control method reacquires the lock. If you are usingReentrantLock, it is reentrant so that if such as is called in the tasksetCorePoolSizeSuch thread pool-controlled methods interrupt running threads.

Therefore, Worker inherits from AQS and is used to determine whether threads are free and can be interrupted.

In addition, setState(-1) is executed in the constructor; , set the state variable to -1. Why? Since the default state in AQS is 0, if a Worker object has just been created and the task has not yet been executed, it should not be interrupted.

@param unused * @return */ protected Boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }Copy the code

The tryAcquire method is determined by whether state is 0, so setState(-1); State is set to -1 to disallow thread interruption prior to task execution.

Because of this, the Worker object’s unlock method is first called in the runWorker method to set state to 0.

6. runWorker

The run method in the Worker class calls the runWorker method to perform the task. The runWorker method has the following code:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // Get the first task Runnable task = w.firstTask; w.firstTask = null; // set this thread to allow interrupts w.nlock (); // Exit loop because of exception Boolean completedAbruptly = true; Try {// If the task is empty, get the task by getTask while (task! = null || (task = getTask()) ! = null) { w.lock(); /** * If the thread pool is stopping, make sure the current thread is interrupted; * If not, make sure the current thread is not interrupted; * The shutdownNow method may also be executed during the execution of this if statement, which sets the state to STOP * * STOP interrupts all threads in the thread pool, The purpose of using thread.interrupted () is to ensure that the Thread is in the RUNNING or SHUTDOWN state because thread.interrupted () will restore the interrupted state. */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; // If the state is not interrupted, task.run() is called to execute the task try {task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

To summarize the execution of the runWorker method:

  1. The while loop keeps getting tasks through the getTask() method;
  2. The getTask() method takes the task from the blocking queue;
  3. If the thread pool is stopping, make sure the current thread is interrupted, otherwise make sure the current thread is not interrupted. Call task.run() to execute the task;
  4. If task is null, it breaks out of the loop and executes processWorkerExit();
  5. When the runWorker method completes execution, it also means that the run method in the Worker completes execution and destroys the thread.

The completedAbruptly variable to indicate whether an exception occurred during the execution of the task, the value of which is evaluated in the processWorkerExit method.

7. getTask()

The getTask method is used to get the task from the blocking queue as follows:

/** * fetch task from blocking queue * @return */ private Runnable getTask() {// the value of timeOut variable indicates whether the last time the task was fetched from blocking queue timedOut Boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * if the thread pool status is rs >= SHUTDOWN, then the thread pool status is not RUNNING. * 2. Whether the blocking queue is empty. * If the above conditions are met, the workerCount is reduced by 1 and null is returned. * * Because if the current thread pool state is SHUTDOWN or above, no more tasks are allowed to be added to the blocking queue. */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; Int wc = workerCountOf(c); // The timed variable is used to determine whether timeout control is required. // allowCoreThreadTimeOut is false by default, meaning that core threads are not allowed to timeout; // wc > corePoolSize, indicating that the number of threads in the current thread pool is greater than the number of core threads; For these threads over the core number of threads, the need for overtime control Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; /** * wc > maximumPoolSize because setMaximumPoolSize may be executed at the same time; * Timed && timedOut if true, the current operation needs to be timed out and the last time the task was fetched from the blocking queue timed out * If the number of valid threads is greater than 1 or the blocking queue is empty, try to decrement workerCount by 1; * If the reduction fails, retry is returned. * If wc == 1, the current thread is the only thread in the pool. */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } /** * Timed: if timed is true, the poll method of the blocking queue is used for timeout control. If no task is obtained within keepAliveTime, null is returned. If the queue is empty at this point, the take method blocks until the queue is not empty. */ try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; // If r == null, timeout has occurred. TimedOut = true; } catch (InterruptedException retry) {// If the current thread is interrupted while retrieving the task, set timedOut to false and return timedOut = false; }}}Copy the code
8. ProcessWorkerExit method
* @param w * @param completedAbruptly */ private void processWorkerExit(Worker w, Boolean completedAbruptly) {// If completedAbruptly is true, an exception has occurred and workerCount needs to be reduced by 1; // If the thread executes without exception, the workerCount is already decrement by 1 in the getTask() method and no further decrement is required. if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Count the number of completed tasks completedTaskCount += w.completedTasks; Worker. remove(w); worker. remove(w); worker. remove(w); } finally { mainLock.unlock(); } // Decide whether to terminate the thread pool based on the thread pool state. TryTerminate (); int c = ctl.get(); /** * when the thread pool is RUNNING or SHUTDOWN, if the worker is terminated abnormally, addWorker will be added directly; * Keep at least one worker if allowCoreThreadTimeOut=true and wait queue has tasks; * If allowCoreThreadTimeOut=false, workerCount is not less than corePoolSize. */ if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}Copy the code

At this point, the worker thread is destroyed after processWorkerExit has finished executing, and this is the lifetime of the worker thread.

fromexecuteMethod start,WorkeruseThreadFactoryCreate a new worker thread,runWorkerthroughgetTaskGet the task, and then execute the task ifgetTaskreturnnullAnd into theprocessWorkerExitMethod, the entire thread terminates, as shown:

10. TryTerminate method

The tryTerminate method determines whether to terminate the thread pool based on the state of the pool, as follows:

final void tryTerminate() { for (;;) { int c = ctl.get(); /** * If the current thread pool status is one of the following: * 1. * 2. TIDYING or TERMINATED because there are no threads running in the thread pool. * 3. Run the task in the workQueue after SHUTDOWN and the wait queue is not empty. */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // If the number of threads is not zero, interrupt an idle worker thread and return if (workerCountOf(c)! = 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // state terminated is TIDYING. If terminated, call terminated method try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated(); } finally {// set state to TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }Copy the code
11:shutdown()

The shutdown method switches the thread pool to the shutdown state, calls interruptIdleWorkers to request the interruption of all idle workers, and finally calls tryTerminate to try to terminate the pool.

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// checkShutdownAccess(); // Switch state to SHUTDOWN advanceRunState(SHUTDOWN); // interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // Try to terminate the thread pool tryTerminate(); }Copy the code

Consider the following question: in the runWorker method, the Worker object W is locked during the execution of the task. Why lock each Worker thread during the execution of the task?

Here’s a closer look:

  1. In the getTask method, if the thread pool is SHUTDOWN and the workQueue is empty, null is returned to terminate the worker thread, and the SHUTDOWN method is called to bring the pool into the SHUTDOWN state.

  2. The shutdown method calls interruptIdleWorkers to interrupt idle threads. InterruptIdleWorkers hold mainLock and iterate over workers to determine whether worker threads are idle one by one. But there is no mainLock in the getTask method;

  3. In getTask, if the current thread pool state is RUNNING and the blocking queue is empty, workqueue.take () is called to block.

  4. If the current thread pool state is RUNNING and the shutdown method is called, the current worker thread will block after the workqueue.take () call and will not be destroyed. Since no new tasks are allowed to be added to the workQueue during SHUTDOWN, the thread pool can never be closed.

  5. From the above,shutdownThe methods andgetTaskMethod has race conditions (when fetching tasks from a queue);

  6. Solving this problem requires thread interrupts, which is why the interruptIdleWorkers method is used. When workqueue.take () is called, if the thread is interrupted before or during execution, InterruptedException is thrown.

  7. But to interrupt the worker thread, you also need to determine whether the worker thread is free. If the worker thread is working on a task, the interrupt should not occur.

  8. InterruptIdleWorkers use tryLock to determine whether the Worker thread is working on a task. If tryLock returns true, the Worker thread is not currently working on a task. Only then can it be interrupted.

11 interruptIdleWorkers method

InterruptIdleWorkers Iterates through all worker threads in the workers and interrupts the thread if tryLock fails.

Why do I need to hold mainLock? Since workers are of type HashSet, thread-safety is not guaranteed.

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
Copy the code
12 hutdownNow method

The shutdownNow method is similar to the shutdown method except that:

  1. Set the state to STOP.
  2. Interrupt all worker threads, whether idle or not;
  3. Retrieves unexecuted tasks from the blocking queue and returns.

After the shutdownNow method is executed, the tryTerminate method, which was analyzed earlier, is called to state the thread pool at TERMINATED.

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // interruptWorkers() interrupts all worker threads, whether idle or not; Tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }Copy the code

3. Monitor the thread pool

Monitored by the parameters provided by the thread pool. There are several properties in the thread pool that you can use when monitoring the thread pool

  1. GetTaskCount: Total number of executed and unexecuted tasks in the thread pool;
  2. GetCompletedTaskCount: The number of completed tasks in the thread pool. The value is less than or equal to taskCount.
  3. GetLargestPoolSize: Maximum number of threads ever created by the thread pool. This data tells you if the thread pool is full, i.e., maximumPoolSize;
  4. GetPoolSize: specifies the current number of threads in the thread pool.
  5. GetActiveCount: The number of threads executing tasks in the current thread pool.

The ThreadPoolExecutor class provides several empty methods, such as beforeExecute, afterExecute, and terminated. You can extend these methods to add new operations before or after execution. For example, counting the execution time of tasks in a thread pool can be extended by inheriting from ThreadPoolExecutor.

4. To summarize

This paper analyzes the workflow of thread pool in detail. Generally speaking, it has the following contents:

  1. The thread creation, task submission, state transition and thread pool closing are analyzed.
  2. The thread pool workflow is expanded through the execute method, which uses corePoolSize, maximumPoolSize, and the size of the blocking queue to determine whether the incoming task should be executed immediately, added to the blocking queue, or rejected.
  3. The process of closing the thread pool is introduced, and the race condition between shutdown method and getTask method is analyzed.

When fetching a task, the thread pool state is used to determine whether the worker thread should be terminated or blocked waiting for a new task. Explains why worker threads are interrupted when closing a thread pool and why every worker needs a lock. When submitting a task to a thread pool, there is also a submit method in addition to the execute method, which returns one

The Future object is used to retrieve the return value. Read about Future and Callable for yourself, not here.