My inquisitive colleague recently threw me another series of questions, including one about the ThreadPoolExecutor thread pool: How does ThreadPoolExecutor reclaim threads? Based on the basic principle of “Talk is cheap, show me the code.” let’s look directly at the source code for ThreadPoolExecutor.

Writing in the front

ThreadPoolExecutor (ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor) I also attached a note I made earlier (it’s a mind map, I hope it’s helpful) : JUC: How ThreadPoolExecutor works

Ok, without further ado, let’s get to the point

ThreadPoolExecutor overview

Start with some necessary knowledge, otherwise you may feel confused when you read the content behind.

  • ThreadPoolExecutor usectlThe variable stores thread pool state, with the higher three bits indicating the running staterunState, the lower 29 bits represent the number of worker threadsworkerCount

  • Running staterunStateThere are the following values:
    • RUNNING: Receives new task submissions and processes tasks in the task buffer queue.
    • SHUTDOWN: Does not receive new task submissions, but processes tasks in the task buffer queue.
    • STOP: Does not receive new task submissions, does not process tasks in the task buffer queue, and interrupts tasks in the execution process.
    • TIDYING: All tasks have been terminated, worker thread count is 0, thread transition toTIDYINGState will callterminated()Hook function.
    • TERMINATED:terminated()Method completes execution.
  • runStateThe flow between states is shown below:

  • ThreadPoolExecutor decouples the submitted task from the worker thread, and builds a producer-consumer model: the submittal of the task is the producer, and the worker thread acts as the consumer and is responsible for the execution of the task. The entire process of ThreadPoolExecutor is shown below :(from the implementation principle of Java thread pool and its practice in meituanservice)

How are tasks submitted: the Execute () method of ThreadPoolExecutor

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); WorkerCount int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); If (isRunning(c) && workqueue.offer (command)) {int recheck = ctl.get(); // Recheck the state of the thread pool, and if necessary, perform a rollback on the tasks that have just been submitted to the queue, i.e. remove if (! isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) // Create a new worker thread addWorker(null, false); } // If the number of worker threads is greater than corePoolSize and the workQueue is full, create a new worker thread until the number reaches the maximum value of maximumPoolSize else if (! addWorker(command, false)) reject(command); }Copy the code

Read the above by referring to the flow chart belowexecute()Method source code: execute()A more important method appears in the methodaddWorker(), let’s click in and see what this method does:

/** * @param firstTask The submitted task may be null, indicating that a new worker thread should be created * to process the task in the buffer queue. * @param core Boolean, true to use corePoolSize as boundary, False maximumPoolSize /* private Boolean addWorker(Runnable firstTask, Boolean core) {Retry: for (;); { int c = ctl.get(); int rs = runStateOf(c); If runState==SHUTDOWN, the thread pool does not allow new tasks to be submitted, but needs to complete tasks in the task buffer queue. If the task buffer queue is not empty and the submitted firstTask parameter is null, If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); / / CAPACITY is the maximum number of threads in theory work (2 ^ 29) if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / CAS non-blocking ThreadPoolExecutor itself maintain work number of threads, the update is successful, jump out of the loop if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); c = ctl.get(); c = ctl.get(); // Re-read ctl if (runStateOf(c) ! = rs) continue retry; // If the running status of the reacquired thread pool has not changed, it is proved that the CAS update failed, just need to re-execute the logic of CAS update worker thread number. // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // As mentioned earlier, when addWorker is required, the thread pool state may be RUNNING, May also have to SHUTDOWN the if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {if (t.i sAlive ()) / / precheck that t is startable throw new IllegalThreadStateException(); // Add the newly created worker to the globally maintained worker set (workers is actually a HashSet) worker.add (w); int s = workers.size(); If (s > largestPoolSize) // Trace the maximum pool size largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {// The worker's run() method is executed here. workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

The addWorker() method looks long, but it doesn’t really do much, just two things:

  • detectionrunStateStatus the CAS is updatedworkerCount.
  • Under the exclusive protection of ReentrantLock, create a Worker and check againrunStateState to safely add the new Worker thread toworkers(a HashSet collection) and executes, while maintaininglargestPoolSizeVariable to track the maximum pool size.

You can also read the above against the flow chart given belowaddWorker()Source:

How are submitted tasks executed: the runWorker() method of ThreadPoolExecutor

Worker is a private class within ThreadPoolExecutor, defined as follows (some code is omitted) :

Private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ * * the Worker threads, */ Final Thread Thread; /** The task to be executed may be null */ Runnable firstTask; /** Records the total number of tasks completed by the Worker */ volatile long completedTasks; */ Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() {runWorker(this);} public void run() {runWorker(this); } // Lock methods // Worker defines an exclusive Lock by inherits the AQS class and overwrites related methods........ is omitted here }Copy the code

Looking back at the previous addWorker() method, when the newly created Worker is successfully put into the Worker set, the start() method of its thread will be called, so the run() method of the Worker object will be called, and runWorker() is called in run(). So let’s focus on this method:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; Try {// If firstTask is not null when the Worker is created, execute the task directly first // Otherwise call getTask() to get the task from the buffer queue and execute while (task! = null || (task = getTask()) ! = null) {// Obtain the Worker's custom exclusive lock to ensure that the Worker will not be interrupted during the execution of the task. // If the current thread pool state has entered the STOP state, you need to further confirm whether the current thread is interrupted // If not, you need to perform the interrupt operation. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; Try {// When everything is ready, execute task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally {// For the Worker that breaks out of the while loop, that is, no longer gets the task from the task buffer queue and executes it, ProcessWorkerExit (w, completedAbruptly); processWorkerExit(w, completedAbruptly); }}Copy the code

RunWorker () is executed as follows:

  1. The while loop keeps passinggetTask()Method to get a task.
  2. getTask()Method to fetch a task from a blocking queue.
  3. If the thread pool is already inSTOPState, then ensure that the current thread is interrupted, otherwise ensure that the current thread is not interrupted.
  4. Perform tasks.
  5. ifgetTask()The results fornullOut of the loop, executeprocessWorkerExit()Method to destroy the thread.

Read the above by referring to the flow chart belowrunWorker()Source:

How ThreadPoolExecutor threads are reclaimed: processWorkerExit()

We already know from the previous section that the processWorkerExit() method is executed in the runWorker() method, so let’s look at what it does:

private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; Worker.remove (w); worker.remove (w); worker.remove (w); } finally { mainLock.unlock(); } tryTerminate(); Int c = ctl.get(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}Copy the code

The thread pool’s job is to maintain a certain number of thread references according to the current state of the thread pool, so as to prevent these threads from being reclaimed by the JVM. When the thread pool decides which threads need to be reclaimed, it only needs to remove the references.

When does the ThreadPoolExecutor thread pool reclaim a thread?

It can be seen from the above that a Worker will return null in getTask() after running runWorker() method, that is, it will jump out of the loop when it cannot get the task from the buffer queue and execute processWorkerExit() to reclaim the current Worker thread. So when ThreadPoolExecutor reclaims a thread, getTask() returns null, so let’s look at what happens inside getTask() :

private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check whether the thread pool is terminating. If runState enters STOP or the task buffer queue is not empty, Then reduce the number of threads and returns null if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {decrementWorkerCount (); return null; } int wc = workerCountOf(c); Timed is set to true when the core thread is allowed to timeout or the number of worker threads exceeds the specified maximum number of core threads. The core thread will not timeout Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } timed r = timed? Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) {// Reset timedOut and retry the loop timedOut = false; }}}Copy the code

The main logic of getTask() is that, in an unrestricted for loop, a task is fetched from the workQueue blocking queue. If it is available, the task is returned. Otherwise, the loop continues until the task is successfully fetched, unless the thread pool is stopped or the workQueue is empty. Either the number of worker threads exceeds the maximumPoolSize maximumPoolSize, or the fetch task times out, the getTask() method breaks out of the for loop and returns.

Refer to the flow chart below to readgetTask()Source:

Getting back to our main concern, when does getTask() return null? In fact, the flow chart has been given, mainly in the following two places:

  1. The first is whether the thread pool has been stopped, corresponding to the source

When the shutdown() method of ThreadPoolExecutor is called and the thread pool enters the shutdown state, there are two cases:

  • callshutdown()“, all tasks have been executedgetTask()Obstruction inworkQueue.take()

In order toshutdown()For example, its internal call stack is calledinterruptIdleWorkers(false);And theinterruptIdleWorkers()The interior minister looks like this:

The getTask() method does not hold a custom exclusive lock on the Worker object. The getTask() method returns from the workqueue.take () block (because the blocking queue of the thread pool is interrupt-enabled) and proceeds to the next for iteration as normal. The program then goes to the first “has the thread pool stopped” condition, meets the condition and returns NULL.

  • callshutdown(), there are tasks in the task buffer queue that have not been executedgetTask()In a normal cycle

Suppose getTask() walks to workqueue.poll (keepAliveTime, timeUnit.nanoseconds), which is fetching tasks from the task buffer queue. If shutdown() is called and the Worker thread is interrupted, Since the workqueue.poll () method responds to an interrupt, it will immediately return null, and getTask() will proceed to the next for iteration, at which point it will go back to the first “has the thread pool stopped” condition, which is not satisfied because the workQueue is not empty. So getTask() runs normally until the workQueue is empty and the remaining tasks have been processed before it breaks out of the loop and returns NULL.

There is a very special case in the second case that needs to be discussed separately, that is, when shutdown() is called, there are only 2 tasks in the task buffer queue, but there are still 4 Worker threads. According to the results discussed above, it is easy to think that: Two of the four Worker threads will normally exit and return null in the first “thread pool has stopped” condition judgment of getTask(), then two Worker threads will be left blocked in workqueue.take () of getTask(), However, shutdown() will only issue an interrupt signal for each qualified Worker, so how can these two workers blocked in workqueue.take () exit the cycle and be reclaimed normally? This depends on the processWorkerExit() method we mentioned earlier, which recycles the Worker thread and calls tryTerminate() internally, The tryTerminate() method calls interruptIdleWorkers(True), which, when true, interrupts only one of the many idle workers currently held in the thread pool. That is to say, one of the two blocked workers mentioned above will be awakened by the interrupt signal and exit the cycle normally, and the awakened Worker will send a new interrupt signal to the other blocked Worker during the recycling process (processWorkerExit() is called), just like dominoes. A series of blocked Worker threads will initiate interrupt signals one by one to wake up the remaining blocked Worker threads due to the normal recycling of one of them. Finally, all Worker threads will be recycled.

  1. The second is whether the number of current working threads is too much, corresponding to the source code

The corresponding scenario is shutdown() not called, the thread pool is in the RUNNING state, and the tasks in the workQueue have all been fetched and executed. In this scenario, the thread pool will reduce the number of Worker threads to corePoolSize (suppose allowCoreThreadTimeOut=false), timed and timedOut will both become true, Since the workQueue is empty, the second “is there too many worker threads” condition is determined to be true, and the loop exits and returns NULL.

At this point, the question of when threads are reclaimed from the ThreadPoolExecutor thread pool is clarified.

Afterword.

Analysis finished, throw to this studious colleague, colleague call good guy 🐶

At last, the reference resources are given as usual:

  • Implementation principle of Java thread pool and its practice in Meituan business
  • JDK8 ThreadPoolExecutor source
  • How are redundant threads in the thread pool reclaimed
  • Java Concurrent Programming