The thread pool ThreadPoolExecutor is the source of this article
Common thread pool types
Java Creates four thread pools by using the Executors static method.
- NewSingleThreadExecutor creates a singleton thread pool to ensure that tasks are executed in order. Out-of-thread tasks will wait in the task and all tasks will be executed in FIFO queue order.
- NewFixedThreadPool creates a fixed size thread group, specifies the number of worker threads, and queues tasks for execution when they exceed the specified number of workers.
- NewCachedThreadPool Creates a pool of cacheable threads. This pool has 0 active threads and a maximum of integer.max. When new tasks are added to the pool, new threads can be created immediately to execute the task. When idle threads exceed 60 seconds, the system reclaims them.
- NewScheduleThreadPool creates a thread pool of fixed length and supports timed and periodic task execution, similar to timers.
- NewWorkStealingPool will create a thread pool containing enough threads to maintain the corresponding level of parallelism. It will make the multi-core CPU not idle by the way of work stealing, and there will always be a live thread for the CPU to run.
NewSingleThreadExecutor, newFixedThreadPool, and newCachedThreadPool all internally encapsulate ThreadPoolExecutor to generate thread pools. Let’s look at the ThreadPoolExecutor class in detail.
ThreadPoolExecutor constructor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Copy the code
- CorePoolSize Specifies the number of core threads that will not be collected.
- MaximumPoolSize Maximum number of threads that can be applied to a thread pool
- WorkQueue Synchronizes tasks in a queue
- KeepAliveTime When the number of threads is greater than the core, this is the maximum time that extra idle threads can wait for a new task before terminating.
- ThreadFactory specifies the threadFactory
- Handler When the number of tasks exceeds the queue capacity, this situation needs to be handled
- AbortPolicy: Throws an exception directly. This is the default policy.
- CallerRunsPolicy: Executes the task using the caller’s thread;
- DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.
- DiscardPolicy: Directly discards the task.
Thread pool query
We already know the basic core construction parameters for creating a thread pool, but there are a lot of questions we don’t understand. How do you know if each thread in the thread pool is working or idle? Is there a special thread to mark idle thread activity? How threads share threads. Read the code with these questions in mind.
State of a thread in a thread pool
The following are comments from the ThreadPoolExecutor code. The thread state in the thread pool is maintained by an AtomicInteger CTL, which is an AtomicInteger that wraps two domain meanings.
-
WorkerCount Specifies the number of valid threads. The total number of threads is 2 ^ 29-1. The number of threads started does not include the number of threads stopped, which may be temporarily different from the actual number of active threads. For example, when ThreadFactory fails to create a thread and the thread is exiting, the count of threads still includes the exiting thread.
-
RunState Thread status
RUNNING
Receiving new tasks and processing tasks in the queueSHUTDOWN
Don’t take on new tasks, but can handle themSTOP
Cannot accept new tasks, cannot process tasks in the queue, but can interrupt ongoing tasks.TIDYING
All tasks terminate, workerCount is 0, and the thread is transitioned to TIDYING state and is about to run terminated() hook methodTERMINATED
Terminated () hook method is complete
Each of these states has a transition order
RUNNING -> SHUTDOWN
Performed shutdown ()(RUNNING or SHUTDOWN) -> STOP
Perform shutdownNow ()SHUTDOWN -> TIDYING
When both the task queue and thread pool are emptySTOP -> TIDYING
Thread pools are emptyTIDYING -> TERMINATED
When the terminated() hook method is finished executing these state-specific code implementations
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Copy the code
Execute method parsing
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Handle 3 steps * 1. If the number of running threads is smaller than the number of core threads, directly create a new thread to perform the task * call the addWorker method to automatically check the thread status and number to avoid error alerts when adding threads when they cannot be added * * 2. If the task can be successfully enqueued, we still need to double check whether a thread is added * because there was a thread death last time we checked or the thread pool was closing when we entered the method * so we recheck the state and roll back the queue if stopped, or start a new thread if not. * * 3. Failed to add task, try to create a thread, if failed, If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // The number of current threads is smaller than the number of core threads
if (addWorker(command, true)) // Create a thread
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // The thread pool is RUNNING and the task was added successfully
int recheck = ctl.get(); // Second check
if (! isRunning(recheck) && remove(command)) // Determine the thread pool status
reject(command);
else if (workerCountOf(recheck) == 0) // The number of thread pools is 0
addWorker(null.false);
}
else if(! addWorker(command,false)) // The thread pool state is not RUNNING or the queue is full. It is used to start non-core thread pull tasks
reject(command);
}
Copy the code
Next we go to the core method of addWorker to create threads
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // Retry flag, first seen at 😓
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN) / / SHUTDOWN at least
&& (runStateAtLeast(c, STOP) // At least STOP is illegal|| firstTask ! =null
|| workQueue.isEmpty()))
return false;
for (;;) { // The status is valid
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // There is no need to create threads that are larger than the core thread or the maximum thread, and the mask prevents the maximum thread count from exceeding 2 ^ 29-1 details
return false;
if (compareAndIncrementWorkerCount(c)) // CTL auto-increment succeeds, and jumps out of the entire loop
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN)) // State is at least SHUTDOWN to re-enter the loop
continue retry;
// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // Create a thread
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Re-check thread pool status during lock
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // The thread that was just created has already started executing tasks, this is a problem
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // Start the task
workerStarted = true; }}}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
The main process of addWorker() is to check whether the thread pool status is valid, create a new thread, join workers, and call start() to execute the task. Let’s look at the Worker class
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run(a) {
runWorker(this); }}Copy the code
Worker is actually a Runnable wrapper class, but it adds the function of task interruption. Its main task is to maintain the interrupted state. Inherits AQS, it can simplify the acquisition and release of the lock around each task execution, and prevent the interruption aimed at waking up the Worker thread waiting for the task. Learn how workers perform tasks by entering runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // Retrieve the task
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) { // If the current worker has no tasks, get the tasks from the queue until the queue is empty
w.lock();
// Handle thread interrupt mechanisms
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task); // Preprocessing, similar to interceptor mechanism, requires subclasses to implement
try {
task.run(); // Call the task method
afterExecute(task, null); // post-processing
} catch (Throwable ex) {
afterExecute(task, ex); // Exception handling
throwex; }}finally {
task = null;
w.completedTasks++; // Number of tasks performed + 1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // When the thread life cycle is complete, the collection is performed}}Copy the code
In combination with the Worker constructor, the Worker locks itself during initialization, preventing the thread from being interrupted before the task has even started. The starting thread executes the runWorker method, fetches the task, releases the lock, and if the task in the Worker is empty, pulls the task from the queue. Processing thread interrupt, mainly based on the state of the first line has at least STOP state, and then clear the interrupt state, in the judgment of the thread has no interrupt signal, and then send interrupt signal. Threads should interrupt when the thread pool is already in the process of stopping, but must double-check to prevent competing relay signals during the closing process. Call the run method to perform the task. The reason why a task needs to be locked is mainly in the process of executing the task. The thread can be interrupted only when the task starts to close because the Worker does not support reentrant lock. This is where we finally see thread sharing, where threads continually fetch tasks from the queue and then call the run method to execute the task. When the thread exits the fetch queue loop, the thread life cycle ends.
geTask()
private Runnable getTask(a) {
boolean timedOut = false; // Whether the last pull timed out
for (;;) {
int c = ctl.get();
// Check that the thread pool state is SHUTDOWN and no new tasks are accepted
// The task queue is empty
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount(); // Number of core threads workercount-1
return null;
}
int wc = workerCountOf(c);
AllowCoreThreadTimeOut Whether to reclaim the number of core threads when idle The default is false
// The number of current threads is greater than the number of core threads
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// If wc is greater than the maximum number of threads, process the number of threads first
// If a thread does not acquire a task during its lifetime, it needs to reclaim the number of threads in the previous loop
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) { //wc should not be 0 and the task queue is empty
if (compareAndDecrementWorkerCount(c)) // thread-1 succeeded, no other threads contested, no new tasks added
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // Return null after timeout
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) { // Abort the task execution
timedOut = false; }}}Copy the code
Poll () + timeout: poll() + timeout: poll() + timeout: poll() + timeout: poll() + timeout: poll() + timeout: poll()
Enter processWorkerExit ()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If the task is not executed, the core thread is -1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // Remove the current worker and the thread will be reclaimed
} finally {
mainLock.unlock();
}
tryTerminate(); // Check the state of the thread pool, whether the thread pool is closed signal
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // The thread pool can still execute or accept tasks when it is in the RUNNABLE or SHUTDOWN state
if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0&&! workQueue.isEmpty())// Threads in the thread pool have been reclaimed and the task has not been completed
min = 1;
if (workerCountOf(c) >= min) // There are more threads in the thread pool than in the core thread pool
return; // replacement not needed
}
addWorker(null.false); // Create a new thread to process the task}}Copy the code
Enter tryTerminate ()
When the thread pool is SHUTDOWN and the task queue is empty, or the core queue is empty in the STOP state, the thread pool transitions to TIDYING and transmits the closed pool signal.
final void tryTerminate(a) {
for (;;) {
int c = ctl.get();
if (isRunning(c) || // The RUNNING state does not need to be handled
runStateAtLeast(c, TIDYING) || // It has entered TIDYING, and does not deal with it
(runStateLessThan(c, STOP) && ! workQueue.isEmpty())) // The task queue is not empty and does not meet the condition
return;
if(workerCountOf(c) ! =0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE); Try to interrupt a workerreturn;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // Lock modifies the thread pool state
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // Enter the TIDYING state
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // the state of terminated() is terminated
termination.signalAll();
}
return; }}finally {
mainLock.unlock();
}
// else retry on failed CAS}}Copy the code
shutdown()
Let’s look at thread pool termination methods
public void shutdown(a) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // Change the state of the thread pool to SHUTDOWN
interruptIdleWorkers(); // Interrupt the thread
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
Copy the code
How does interruptIdleWorkers() interrupt the thread
private void interruptIdleWorkers(a) {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // Workers are an unsafe set
try {
for (Worker w : workers) {
Thread t = w.thread;
if(! t.isInterrupted() && w.tryLock()) {// If there is no interrupt and the lock can be acquired, the thread pool is not executing the task and the Worker does not support reentry
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally{ w.unlock(); }}if (onlyOne)
break; }}finally{ mainLock.unlock(); }}Copy the code
The solution is simple: change the thread pool state to not accept new tasks, and pull out idle threads from Works to signal an interrupt.
shutdownNow
public List<Runnable> shutdownNow(a) {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue(); // Delete the task from the queue and return to Tasks
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Copy the code
ShutdownNow removes all tasks from the queue that have not yet been processed, terminating the thread pool life cycle by calling tryTerminate() directly.
conclusion
Now we know how thread creation, thread sharing, idle collection, and thread pool life cycle work inside a thread pool. Execute () is called to submit the task. If the current thread pool is smaller than the number of core threads, addWorker() is called to create a new thread pool to execute the task, otherwise it is directly queued. In addWorker(), a thread is started to continually pull tasks from the queue and is not recycled until the queue is empty or no work has been executed for a lifetime. When setting the thread pool, pay attention to some details. The number of core threads is set according to the task situation. In most cases, it is the number of core threads that processes tasks. So when setting the maximum number of threads, be careful to set the queue size. If Integer.MAX, the number of threads will never exceed the number of core threads. Only when the task exceeds the queue capacity + the maximum number of threads, the saturation policy will be triggered. Select an appropriate processing method according to the task requirements.