Blog links: www.ideabuffer.cn/2017/04/04/…
Introduction to thread pools
In Web development, the server needs to accept and process requests, so each request is allocated a thread to process. This is easy to implement if you create a new thread for each request, but there is a problem:
If the number of concurrent requests is very large, but the execution time of each thread is very short, threads will be created and destroyed frequently, which will greatly reduce the efficiency of the system. It may occur that the server spends more time and consumes more system resources creating and destroying new threads for each request than it does processing the actual user request.
Is there a way to finish a task without being destroyed, and then move on to other tasks?
That’s what thread pools are for. Thread pools provide a solution to the overhead and under-resourcing problems of the thread life cycle. By reusing threads for multiple tasks, the overhead of thread creation is spread over multiple tasks.
When to use thread pools?
- The processing time of a single task is short
- The number of tasks to handle is large
The benefits of using thread pools
Citing the ifeve.com/java-thread… Introduction:
- Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.
- Improve response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created.
- Improve thread manageability. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for unified allocation, tuning, and monitoring.
Thread pools in Java are implemented using the ThreadPoolExecutor class. This article combines JDK 1.8 to this class source code to analyze this class internal for the creation of threads, management and scheduling background tasks and other aspects of the execution principle.
Let’s take a look at the thread pool class diagram:
Executor Framework interface
The Executor framework is a framework for invoking, scheduling, executing, and controlling asynchronous tasks based on a set of execution policies. The purpose is to provide a mechanism for separating task submission from how the task works.
There are three Executor interfaces in J.U.C:
- Executor: A simple interface to run new tasks;
- ExecutorService: Extends the Executor interface. Added some methods to manage the executor lifecycle and task lifecycle;
- ScheduledExecutorService: extends the ExecutorService. Support for Future and periodic execution of tasks.
The Executor interface
public interface Executor {
void execute(Runnable command);
}Copy the code
The Executor interface has only one execute method, which is used instead of the usual method of creating or starting a thread. For example, the code to create and start a Thread using Thread looks like this:
Thread t = new Thread();
t.start();Copy the code
The code for starting a thread to execute a task using Executor is as follows:
Thread t = new Thread();
executor.execute(t);Copy the code
For different Executor implementation, the execute () method might be to create a new thread and immediately start, it is possible that the use of the existing worker threads to run into tasks, can also be based on setting the thread pool capacity or the capacity of the blocking queue to decide whether to put the incoming thread in blocking the queue or refuse to receive the incoming thread.
The ExecutorService interface
The ExecutorService interface, which inherits from the Executor interface, provides a way to manage terminations, and a way to generate futures for tracking the execution of one or more asynchronous tasks. Add shutDown(), shutDownNow(), invokeAll(), invokeAny(), and submit() methods. If you need to support instant shutdown, the shutDownNow() method, the task needs to handle the interruption correctly.
ScheduledExecutorService interface
ScheduledExecutorService extends the ExecutorService interface and adds the Schedule method. The schedule method is called to perform a Runnable or Callable task after a specified delay. The ScheduledExecutorService interface also defines the scheduleAtFixedRate() and scheduleWithFixedDelay() methods for executing tasks at specified intervals.
ThreadPoolExecutor analysis
ThreadPoolExecutor is derived from AbstractExecutorService and implements the ExecutorService interface.
A few important fields
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;Copy the code
CTL is a field that controls the running state of the thread pool and the number of valid threads in the thread pool. It contains two parts of information: The running state of the thread pool (runState) and the number of valid threads in the thread pool (workerCount), as you can see here, are stored using the Integer type, with the runState stored in the upper 3 bits and the workerCount stored in the lower 29 bits. COUNT_BITS is 29, and CAPACITY is 1 shifted 29 bits to the left minus 1 (29 1s). This constant represents the upper limit of workerCount, which is about 500 million.
Let’s take a look at the running state of the thread pool. There are five states in a thread pool:
- RUNNING: Can accept newly submitted tasks and also process tasks in a blocking queue.
- SHUTDOWN: The state is closed. New submitted tasks are no longer accepted, but saved tasks in the blocking queue can continue to be processed. When the thread pool is in the RUNNING state, calling shutdown() brings it to that state. (The finalize() method will also call shutdown() method to enter this state during execution);
- STOP: new tasks cannot be accepted and tasks in the queue are not processed. The processing thread is interrupted. When a thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow() method brings the pool to that state.
- TIDYING: If all tasks terminated and workerCount is 0, the thread pool terminated by calling the terminated() method.
- TERMINATED: enters the state after the terminated() method is terminated. By default, nothing is done in the terminated() method.
Entry into TERMINATED conditions is as follows:- The thread pool is not RUNNING;
- The thread pool state is not TIDYING or TERMINATED;
- If the thread pool state is SHUTDOWN and the workerQueue is empty;
- WorkerCount 0;
- Setting the TIDYING state succeeded.
The following is the thread pool state transition process:
CTL correlation method
Here are a few more ways to evaluate CTLS:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }Copy the code
- RunStateOf: Obtains the running status.
- WorkerCountOf: Gets the number of active threads;
- CtlOf: Gets the value of the health status and number of active threads.
ThreadPoolExecutor constructor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}Copy the code
The fields in the constructor have the following meanings:
-
CorePoolSize: the number of core threads that perform the following judgments when a new task is submitted in the execute() method:
- If fewer threads are running than corePoolSize, a new thread is created to process the task, even if the other threads in the thread pool are idle;
- If the number of threads in the thread pool is greater than or equal to corePoolSize and smaller than maximumPoolSize, new threads are created to process tasks only when the workQueue is full.
- If corePoolSize is set to the same as maximumPoolSize, the size of the created thread pool is fixed. If a new task is submitted, the request will be added to the workQueue if the workQueue is not full. Wait for an idle thread to fetch a task from the workQueue and process it.
- If the number of threads running is greater than or equal to maximumPoolSize, then if the workQueue is full, the task is processed using the policy specified by the handler.
Therefore, the task submission is determined in the order corePoolSize > workQueue > maximumPoolSize.
- MaximumPoolSize: specifies the maximum number of threads.
- WorkQueue: wait queue. When a task is submitted, if the number of threads in the thread pool is greater than or equal to corePoolSize, the task is encapsulated as a Worker object and placed in the wait queue.
- workQueueWhen a new task is submitted to the thread pool, the thread pool will determine the processing mode of the task according to the number of threads running in the current thread pool. There are mainly the following processing modes:
- Direct switch: The common queue for this is SynchronousQueue, which has not been studied yet and cannot be described here;
- Use unbounded queues: Generally use LinkedBlockingQueue, a list-based blocking queue. If you use this approach, the maximum number of threads that can be created in the thread pool is corePoolSize, and maximumPoolSize will not work (as discussed later). When all the core threads in the thread pool are in the RUNNING state, a new task submission is put into the wait queue.
- Use bounded queuesArrayBlockingQueue is generally used. Use this way to limit the maximum number of threads thread pool for maximumPoolSize, so can reduce the consumption of resources, but at the same time, this way also makes the thread pool to thread scheduling becomes more difficult, because the thread pool and the capacity of the queue is a finite value, so if you want to make a thread pool processing throughput to achieve a relatively reasonable range, If you want to make thread scheduling relatively simple and minimize the resource consumption of the thread pool, you need to set these two numbers properly.
- If you want to reduce the consumption of system resources (including CPU usage, operating system resource consumption, context switching overhead, etc.), you can set a larger queue capacity and a smaller thread pool capacity, but this will also reduce the throughput of thread processing tasks.
- If submitted tasks frequently block, consider resetting the capacity of the thread pool by calling the setMaximumPoolSize() method.
- If the capacity of the queue is set to be small, it is usually necessary to set the capacity of the thread pool to be large so that the CPU usage is relatively high. However, if the capacity of the thread pool is set too high, concurrency will increase if the number of submitted tasks is too high, and scheduling between threads is an issue to consider because it can reduce the throughput of processing tasks.
- KeepAliveTime: The idle time allowed by the thread pool to maintain threads. When the number of threads in the thread pool is greater than corePoolSize, threads outside the core thread will not be destroyed immediately if no new task is submitted. Instead, they will wait until the keepAliveTime is exceeded.
- ThreadFactory: This is a variable of type threadFactory used to create a new thread. Default Executors. DefaultThreadFactory () to create a thread. When a thread is created using the default ThreadFactory, the new thread is created with the same NORM_PRIORITY priority and is non-daemon, and the thread name is set.
- handler: a variable of type RejectedExecutionHandler that represents the saturation policy of the thread pool. If the blocking queue is full and there are no free threads, then you need to adopt a policy to handle the task if you continue to submit it. Thread pools provide four policies:
- AbortPolicy: Throws an exception directly. This is the default policy.
- CallerRunsPolicy: Executes the task with the caller’s thread;
- DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
- DiscardPolicy: Directly discards the task.
The execute method
The execute() method is used to submit the task as follows:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* clt记录着runState和workerCount
*/
int c = ctl.get();
/* * workerCountOf (); /* * workerCountOf (); * If the number of active threads is smaller than corePoolSize, a new thread is added to the thread pool; * and add the task to the thread. * /
if (workerCountOf(c) < corePoolSize) {
/* * The second parameter in addWorker indicates whether the number of threads added is limited by corePoolSize or maximumPoolSize; * If true, judge by corePoolSize; */ if false, use maximumPoolSize to determine */
if (addWorker(command, true))
return;
/* * If the add fails, get the CTL value */ again
c = ctl.get();
}
/* * If the current thread pool is running and the task was successfully added to the queue */
if (isRunning(c) && workQueue.offer(command)) {
// Get the CTL value again
int recheck = ctl.get();
// Check whether the thread pool is running again. If it is not running, since command has already been added to the workQueue,
// The command needs to be removed
// After execution, the handler uses the reject policy to process the task, and the whole method returns
if (! isRunning(recheck) && remove(command))
reject(command);
/* * Gets the number of valid threads in the thread pool. If the number is 0, execute the addWorker method. The first argument is null, indicating that a thread is created in the thread pool but not started. * 2. If the second parameter is false, set the maximum number of threads in the pool to maximumPoolSize. * If workerCount is greater than 0, the new command will be executed at some point in the future. * /
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
* 1. The thread pool is not in the RUNNING state; WorkerCount >= corePoolSize and workQueue is full. * At this point, the addWorker method is called again, but the second argument is passed false, setting the upper limit of the pool's finite number of threads to maximumPoolSize; * If it fails, the task is rejected */
else if(! addWorker(command,false))
reject(command);
}Copy the code
Simply put, if the state of execute() is RUNNING, the execution process is as follows:
- if
workerCount < corePoolSize
, a thread is created and started to execute the newly submitted task; - if
workerCount >= corePoolSize
, and the blocking queue in the thread pool is not full, then the task is added to the blocking queue; - if
workerCount >= corePoolSize && workerCount < maximumPoolSize
, and the blocking queue in the thread pool is full, a thread is created and started to execute the newly submitted task; - if
workerCount >= maximumPoolSize
, and the blocking queue in the thread pool is full, the task is processed according to the reject policy. The default processing method is to throw an exception directly.
Note here that addWorker(null, false); In other words, a thread is created but no task is passed in because the task has already been added to the workQueue, so the worker will fetch the task directly from the workQueue when executing it. So addWorker(null, false) is executed when workerCountOf(recheck) == 0; In order to ensure that the thread pool in the RUNNING state must have a thread to perform the task.
The execution flow of the execute method is as follows:
AddWorker method
The main job of addWorker is to create a new thread in the thread pool and execute it. The firstTask parameter specifies the firstTask to be executed by the new thread. The core parameter is true to determine whether the number of active threads is less than corePoolSize. False indicates whether the number of active threads is less than maximumPoolSize before adding a new thread.
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// Get the running status
int rs = runStateOf(c);
/* * if rs >= SHUTDOWN, no new task will be received; * 1. Rs == SHUTDOWN, which indicates that the state is SHUTDOWN, but can continue to process the existing tasks in the blocking queue * 2. FirsTask empty * 3. The blocking queue is not empty * * First consider rs == SHUTDOWN * in which case no new submitted task will be accepted, so return false if firstTask is not empty; * Then, if firstTask is empty and workQueue is also empty, return false, * because there are no more tasks in the queue and no more threads need to be added */
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
// Get the number of threads
int wc = workerCountOf(c);
// If wc exceeds CAPACITY, which is the maximum of the lower 29 bits of the CTL (binary is 29 1s), return false;
// core is the second argument to the addWorker method.
// If false, compare by maximumPoolSize.
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// Try increasing workerCount, and if successful, break out of the first for loop
if (compareAndIncrementWorkerCount(c))
break retry;
// If the workerCount increment fails, retrieve the CTL value again
c = ctl.get(); // Re-read ctl
// If the current running state is not rs, the state has been changed, and the first for loop is returned to continue
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Create the Worker object according to firstTask
w = new Worker(firstTask);
// Each Worker object creates a thread
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN;
// If rs is RUNNING or rs is SHUTDOWN and firstTask is null, add threads to the thread pool.
// Since no new tasks are added during SHUTDOWN, the tasks in the workQueue are still executed
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
Workers is a HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize records the maximum number of threads in the thread pool
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
if (workerAdded) {
// Start the thread
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}Copy the code
Notice the t.start() statement here. When started, the run method in the Worker class is called. The Worker itself implements the Runnable interface, so an object of Worker type is also a thread.
The Worker class
Each thread in the ThreadPool is encapsulated into a Worker object. ThreadPool maintains a group of Worker objects. See the definition of Worker:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run(a) {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively(a) {
returngetState() ! =0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0.1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock(a) { acquire(1); }
public boolean tryLock(a) { return tryAcquire(1); }
public void unlock(a) { release(1); }
public boolean isLocked(a) { return isHeldExclusively(); }
void interruptIfStarted(a) {
Thread t;
if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}Copy the code
The Worker class inherits AQS and implements the Runnable interface. Note firstTask and Thread attributes: firstTask saves incoming tasks; A thread is a thread created from a ThreadFactory when the constructor is called, and is used to process tasks.
When the constructor is called, the task is passed in, in this case via getThreadFactory().newThread(this); To create a newThread, the newThread method passes in the parameter this. Since the Worker itself inherits the Runnable interface, that is, a thread, a Worker object will call the run method in the Worker class when it is started.
Worker inherits AQS and uses AQS to realize the function of exclusive lock. Why not use ReentrantLock? You can see the tryAcquire method, which does not allow reentrant, whereas ReentrantLock does:
- Once the lock method obtains an exclusive lock, it indicates that the current thread is executing a task.
- If a task is being executed, the thread should not be interrupted;
- If the thread is not in the exclusive lock state, that is, the idle state, it is not processing the task, then you can interrupt the thread.
- The thread pool calls the interruptIdleWorkers method to interrupt idle threads when the shutdown or tryTerminate methods are executed. The interruptIdleWorkers method uses the tryLock method to determine whether threads in the pool are idle.
- It is set to non-reentrant because we do not want the task to regain the lock when it calls a thread pool control method like setCorePoolSize. If you use ReentrantLock, it is reentrant, so that if a thread pool-controlled method such as setCorePoolSize is called in the task, the running thread is interrupted.
Therefore, Worker inherits from AQS and is used to determine whether threads are free and can be interrupted.
In addition, setState(-1) is executed in the constructor; , set the state variable to -1. Why? Since the default state in AQS is 0, if a Worker object has just been created and the task has not yet been executed, it should not be interrupted.
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0.1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}Copy the code
The tryAcquire method is determined by whether state is 0, so setState(-1); State is set to -1 to disallow thread interruption prior to task execution.
Because of this, the Worker object’s unlock method is first called in the runWorker method to set state to 0.
RunWorker method
The run method in the Worker class calls the runWorker method to perform the task. The runWorker method has the following code:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// Get the first task
Runnable task = w.firstTask;
w.firstTask = null;
// Allow interrupts
w.unlock(); // allow interrupts
// Whether the loop exits because of an exception
boolean completedAbruptly = true;
try {
// If task is empty, getTask is used to get the task
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
Here is an illustration of the first if judgment, which aims to:
- If the thread pool is stopping, make sure the current thread is interrupted.
- If not, ensure that the current thread is not interrupted;
Consider that the shutdownNow method may also be executed during the execution of the if statement, which sets the state to STOP. Recall the STOP state:
New tasks cannot be accepted and tasks in the queue are not processed, which interrupts the thread that is processing the task. When a thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow() method brings the pool to that state.
The STOP state interrupts all threads in the Thread pool, but using thread.interrupted () ensures that the Thread is uninterrupted in the RUNNING or SHUTDOWN state because thread.interrupted () will restore the interrupted state.
To summarize the execution of the runWorker method:
- The while loop keeps getting tasks through the getTask() method;
- The getTask() method takes the task from the blocking queue;
- If the thread pool is stopping, make sure the current thread is interrupted, otherwise make sure the current thread is not interrupted.
- call
task.run()
Perform tasks; - If task is null, it breaks out of the loop and executes processWorkerExit();
- When the runWorker method completes execution, it also means that the run method in the Worker completes execution and destroys the thread.
The beforeExecute and afterExecute methods here are left empty in the ThreadPoolExecutor class and left to subclasses to implement.
The completedAbruptly variable to indicate whether an exception occurred during the execution of the task, the value of which is evaluated in the processWorkerExit method.
GetTask method
The getTask method is used to get the task from the blocking queue as follows:
private Runnable getTask(a) {
// The value of the timeOut variable indicates whether the last time the task was fetched from the blocking queue timed out
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/* * if the thread pool status is rs >= SHUTDOWN, then the thread pool status is not RUNNING. * 2. Whether the blocking queue is empty. * If the above conditions are met, the workerCount is reduced by 1 and null is returned. * Because no more tasks are allowed to be added to the blocking queue if the current thread pool state is SHUTDOWN or above. * /
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// The timed variable is used to determine whether timeout control is required.
// allowCoreThreadTimeOut is false by default, meaning that core threads are not allowed to timeout;
// wc > corePoolSize, indicating that the number of threads in the current thread pool is greater than the number of core threads;
// Timeout control is required for those threads that exceed the number of core threads
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/* * wc > maximumPoolSize because setMaximumPoolSize may be executed at the same time; * Timed && timedOut if true, the current operation needs to be timed out and the last time the task was fetched from the blocking queue timed out * If the number of valid threads is greater than 1 or the blocking queue is empty, try to decrement workerCount by 1; * If the reduction fails, retry is returned. * If wc == 1, the current thread is the only thread in the pool. * /
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/* * Timed: if timed is true, the poll method of the blocking queue is used for timeout control. If no task is obtained within keepAliveTime, null is returned. If the queue is empty at this point, the take method blocks until the queue is not empty. * * /
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
// If r == null, timeout has occurred and timedOut is set to true
timedOut = true;
} catch (InterruptedException retry) {
// If the current thread interrupts while fetching the task, set timedOut to false and return to loop retry
timedOut = false; }}}Copy the code
The important thing here is the second if judgment, which controls the effective number of threads in the thread pool. If the number of threads in the current thread pool exceeds corePoolSize and is smaller than maximumPoolSize, and the workQueue is full, you can add worker threads. If the number of threads in the current thread pool exceeds corePoolSize and is smaller than maximumPoolSize, you can add worker threads. If timedOut is true, the workQueue is empty, which means that there are no more threads in the current thread pool to perform the task. You can destroy the number of threads that are larger than corePoolSize, leaving the number of threads at corePoolSize.
When will it be destroyed? When the runWorker method is finished executing, the JVM automatically retrieves it.
When the getTask method returns NULL, the while loop is broken out of the runWorker method, followed by the processWorkerExit method.
ProcessWorkerExit method
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If completedAbruptly is true, an exception occurred during thread execution and the workerCount must be reduced by 1.
// If the thread executes without exception, the workerCount is already decrement by 1 in the getTask() method and no further decrement is required.
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Count the completed tasks
completedTaskCount += w.completedTasks;
// Remove a worker thread from the pool
workers.remove(w);
} finally {
mainLock.unlock();
}
// Decide whether to terminate the thread pool based on the thread pool state
tryTerminate();
int c = ctl.get();
/* * when the thread pool is RUNNING or SHUTDOWN, if the worker is terminated abnormally, addWorker will be added directly; * Keep at least one worker if allowCoreThreadTimeOut=true and wait queue has tasks; * If allowCoreThreadTimeOut=false, workerCount is not less than corePoolSize. * /
if (runStateLessThan(c, STOP)) {
if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0&&! workQueue.isEmpty()) min =1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null.false); }}Copy the code
At this point, the Worker thread is destroyed after processWorkerExit is executed. This is the entire Worker thread lifecycle. Starting with the execute method, the Worker uses ThreadFactory to create a new Worker thread, and the runWorker uses getTask to get the task. The task is then executed, and if getTask returns NULL, the processWorkerExit method is entered, and the entire thread terminates, as shown:
TryTerminate method
The tryTerminate method determines whether to terminate the thread pool based on the state of the pool, as follows:
final void tryTerminate(a) {
for (;;) {
int c = ctl.get();
/* * If the current thread pool status is one of the following: * 1. * 2. TIDYING or TERMINATED because there are no threads running in the thread pool. * 3. Run the task in the workQueue after SHUTDOWN and the wait queue is not empty. * /
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// If the number of threads is not zero, an idle worker thread is interrupted and returned
if(workerCountOf(c) ! =0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Try to set the state to TIDYING. If it succeeds, the terminated method is called
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// The terminated method defaults to doing nothing and leaves it to subclasses
terminated();
} finally {
// Set the state to TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return; }}finally {
mainLock.unlock();
}
// else retry on failed CAS}}Copy the code
Shutdown method
The shutdown method switches the thread pool to the shutdown state, calls interruptIdleWorkers to request the interruption of all idle workers, and finally calls tryTerminate to try to terminate the pool.
public void shutdown(a) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Determine the security policy
checkShutdownAccess();
// The switch state is SHUTDOWN
advanceRunState(SHUTDOWN);
// Interrupt the idle thread
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// Try to terminate the thread pool
tryTerminate();
}Copy the code
Consider the following question: in the runWorker method, the Worker object W is locked during the execution of the task. Why lock each Worker thread during the execution of the task?
Here’s a closer look:
- In the getTask method, if the thread pool is SHUTDOWN and the workQueue is empty, null is returned to terminate the worker thread, and the SHUTDOWN method is called to bring the pool into the SHUTDOWN state.
- The shutdown method calls interruptIdleWorkers to interrupt idle threads. InterruptIdleWorkers hold mainLock and iterate over workers to determine whether worker threads are idle one by one. But there is no mainLock in the getTask method;
- In getTask, is called if the current thread pool state is judged to be RUNNING and the blocking queue is empty
workQueue.take()
To block; - If the current thread pool state is RUNNING and the shutdown method is used to change the state to shutdown, then the current worker thread is calling if there is no interrupt
workQueue.take()
The workQueue is blocked and not destroyed because no new tasks are allowed to be added to the workQueue during SHUTDOWN, so the thread pool can never be closed. - It can be seen from the above that there are race conditions between shutdown method and getTask method (when the task is obtained from the queue).
- Solving this problem requires thread interrupts, which is why the interruptIdleWorkers method is used. In the call
workQueue.take()
If the thread is interrupted before or during execution, InterruptedException is thrown. - But to interrupt the worker thread, you also need to determine whether the worker thread is free. If the worker thread is working on a task, the interrupt should not occur.
- InterruptIdleWorkers use tryLock to determine whether the Worker thread is working on a task. If tryLock returns true, the Worker thread is not currently working on a task. Only then can it be interrupted.
Let’s examine the interruptIdleWorkers method.
InterruptIdleWorkers method
private void interruptIdleWorkers(a) {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if(! t.isInterrupted() && w.tryLock()) {try {
t.interrupt();
} catch (SecurityException ignore) {
} finally{ w.unlock(); }}if (onlyOne)
break; }}finally{ mainLock.unlock(); }}Copy the code
InterruptIdleWorkers Iterates through all worker threads in the workers and interrupts the thread if tryLock fails.
Why do I need to hold mainLock? Since workers are of type HashSet, thread-safety is not guaranteed.
ShutdownNow method
public List<Runnable> shutdownNow(a) {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// Interrupts all worker threads, whether idle or not
interruptWorkers();
// Retrieve tasks that are not executed in the queue
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}Copy the code
The shutdownNow method is similar to the shutdown method except that:
- Set the state to STOP.
- Interrupt all worker threads, whether idle or not;
- Retrieves unexecuted tasks from the blocking queue and returns.
After the shutdownNow method is executed, the tryTerminate method, which was analyzed earlier, is called to state the thread pool at TERMINATED.
Thread pool monitoring
Monitored by the parameters provided by the thread pool. There are several properties in the thread pool that you can use when monitoring the thread pool
- GetTaskCount: Total number of executed and unexecuted tasks in the thread pool;
- GetCompletedTaskCount: The number of completed tasks in the thread pool. The value is less than or equal to taskCount.
- GetLargestPoolSize: Maximum number of threads ever created by the thread pool. This data tells you if the thread pool is full, i.e., maximumPoolSize;
- GetPoolSize: specifies the current number of threads in the thread pool.
- GetActiveCount: The number of threads executing tasks in the current thread pool.
The ThreadPoolExecutor class provides several empty methods, such as beforeExecute, afterExecute, and terminated. You can extend these methods to add new operations before or after execution. For example, counting the execution time of tasks in a thread pool can be extended by inheriting from ThreadPoolExecutor.
conclusion
This paper analyzes the workflow of thread pool in detail. Generally speaking, it has the following contents:
- The thread creation, task submission, state transition and thread pool closing are analyzed.
- The thread pool workflow is expanded through the execute method, which uses corePoolSize, maximumPoolSize, and the size of the blocking queue to determine whether the incoming task should be executed immediately, added to the blocking queue, or rejected.
- The process of closing the thread pool is introduced, and the race condition between shutdown method and getTask method is analyzed.
- When acquiring tasks, the state of the thread pool is used to determine whether the worker thread should be terminated or blocked waiting for new tasks, which also explains why the worker thread should be interrupted when closing the thread pool and why every worker needs a lock.
In addition to the execute method, there is also a Submit method, which returns a Future object to retrieve the return value. Future and Callable are not described in this article.