The thread pool
Refer to the articleJava Thread Pool implementation principle – Meituan technical team.Java thread pool implementation principle and source code analysis
Writing in the front
What is a thread pool
Thread Pool is a tool to manage threads based on pooling idea, which often appears in multi-threaded server. Too many lines will bring extra costs, including the cost of creating and destroying threads, the cost of scheduling threads, etc., and also reduce the overall performance of the computer. A thread pool maintains multiple threads waiting for a supervisor to assign tasks that can be executed concurrently. This approach, on the one hand, avoids the cost of creating and destroying threads while processing tasks, on the other hand, avoids the excessive scheduling problem caused by the expansion of the number of threads, and ensures the full utilization of the kernel.
2. Advantages of thread pools
- Reduced resource consumption: Reuse of created threads through pooling techniques to reduce wastage from thread creation and destruction
- Improved response time: Tasks can be executed immediately when they arrive without waiting for threads to be created.
- Improve manageability of threads: Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also cause resource scheduling imbalance due to unreasonable distribution of threads, which reduces system stability. Thread pools allow for uniform allocation, tuning, and monitoring.
- More and more power: Thread pools are extensible, allowing developers to add more functionality to them. Such as delay timer thread pool ScheduledThreadPoolExecutor, allows a stay of execution or regular task execution.
The core and overall implementation of thread pools
The overall design
The core implementation class of Java thread pool is ThreadPoolExecutor. This chapter analyzes the core design and implementation of Java thread pool based on JDK 1.8 source code. Let’s start by looking at the UML class diagram for ThreadPoolExecutor.
- Executor, ThreadPoolExecutor achieve the top of the interface is the Executor, the top interface Executor provides a thought: the task submission and decouple task execution. You do not need to worry about how to create a thread or schedule a thread to execute a task. You only need to provide a Runnable object and submit the execution logic of the task to the Executor. The Executor framework handles the allocation of threads and execution of the task.
- ExecutorService :(1) expand the ability to execute tasks, complementing methods that can generate futures for one or a group of asynchronous tasks; (2) provides methods to manage thread pools, such as stopping them from running
- AbstractExecutorService: A high-level abstract class that strings together the process of performing a task, ensuring that the underlying implementation only needs to focus on a single method for performing the task
- ThreadPoolExecutor: Implementation class, the most complex part of the business. ThreadPoolExecutor will maintain its own life cycle on the one hand and manage threads and tasks on the other, allowing the two to work together nicely to execute parallel tasks.
Thread pooling actually builds a producer-consumer model internally, decoupling threads and tasks from each other and not directly related to each other, so as to buffer tasks well and reuse threads. The operation of thread pool is mainly divided into two parts: task management and thread management. The task management part acts as a producer. After the task is submitted, the thread pool will judge the subsequent flow of the task :(1) directly apply for the thread to execute the task; (2) Buffer to queue for thread execution; (3) Reject the task. The thread management part is the consumer, which is uniformly maintained in the thread pool. According to the task request, threads are allocated. When the thread completes the task, it will continue to acquire new tasks to execute.
Execution flow chart
- After submitting a task, the thread pool determines whether the number of core threads has been reached. If the number of core threads has not been reached, it creates a core thread to process the task; otherwise, it performs the next step.
- The thread pool then determines whether the task queue (blocking queue) is full. If not, add the task to the task queue; otherwise, go to the next step.
- The thread pool determines whether the maximum number of threads has been reached. If not, non-core threads are created to handle tasks. Otherwise, a saturation strategy is executed and an exception is thrown.
A constructor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {... }Copy the code
- CorePoolSize: number of core threads. By default, the thread pool is empty and a thread is created only when a task is submitted. If the number of threads currently running is less than the number of core threads, a new thread is created to process the task. If it is greater than or equal to the number of core threads, it is not created. You can pre-create and start all core threads by calling preStartAllCoreThread to wait for the task.
- MaximumPoolSize: The maximum number of threads allowed to be created in the thread pool. If the task queue is full and the current thread count is less than maximumPoolSize, the thread pool will still create a new thread to process the task.
- KeepAliveTime: Idle timeout of a non-core thread. KeepAliveTime is also applied to the core thread if allowCoreThreadTimeout is set to True.
- Unit: keepAliveTime Time unit.
- WorkQueue: indicates a task queue. When the current number of threads is greater than the number of core threads, the task is added to the task queue. The task queue is of type BlockingQueue, or BlockingQueue.
- ThreadFactory: threadFactory. Used to create threads, set names, etc.
- Handler: saturation policy. The policy taken when both the task queue and the number of threads are full. Default is AbordPoilcy, unable to process a new task, and throw RejectedExecutionException. There are also three strategies.
Execute the rejected task in the execute thread. Discardoldoldestpolicy: Discarding the rejected task. Discards the task at the top of the task queue, and retry execute unless the executioner is closed, in which case the task will be discarded
Life cycle management
Running status of the thread pool. CTL this AtomicInteger is a field that controls the running state of the thread pool and the number of valid threads in the pool. The runState of the thread pool and the number of valid threads in the thread pool (workerCount). The runState is stored in the higher 3 bits and the workerCount is stored in the lower 29 bits. The two variables do not interfere with each other.
// Use the lower 29 bits to indicate the number of threads in the thread pool and the higher 3 bits to indicate the running status of the thread pool
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));/ / the get () the initial value is 0 b11100000000000000000000000000000
Copy the code
Thread pools also provide several methods for the user to obtain the current running state of the thread pool and the number of threads
private static int runStateOf(int c) { return c & ~CAPACITY; } // Calculate the current running status
private static int workerCountOf(int c) { return c & CAPACITY; } // Count the number of current threads
private static int ctlOf(int rs, int wc) { return rs | wc; } // Generate CTLS from state and thread count
Copy the code
Five states of a thread pool
// runState is stored in the high-order bits
// The thread pool in this state receives new tasks and processes tasks in the blocking queue;
private static final int RUNNING = -1 << COUNT_BITS;//0b11100000000000000000000000000000
// The thread pool in this state will not receive new tasks, but will process tasks in the blocking queue;
private static final int SHUTDOWN = 0 << COUNT_BITS;/ / 0
// Threads in this state do not receive new tasks, do not process tasks in the blocking queue, and interrupt running tasks;
private static final int STOP = 1 << COUNT_BITS;//0b00100000000000000000000000000000
// All tasks have been terminated
private static final int TIDYING = 2 << COUNT_BITS;//0b01000000000000000000000000000000
The //terminated() method is complete
private static final int TERMINATED = 3 << COUNT_BITS;//0b01100000000000000000000000000000
Copy the code
Task execution mechanism
Submit a task
The entry point for task scheduling is the execute() method. Execution process:
- First, check the RUNNING status of the thread pool. If it is not RUNNING, reject it directly. Ensure that the thread pool executes tasks in the RUNNING state.
- If workerCount < corePoolSize, a thread is created and started to execute the newly submitted task.
- If workerCount >= corePoolSize and the blocking queue in the thread pool is not full, the task is added to the blocking queue.
- If workerCount >= corePoolSize && workerCount < maximumPoolSize and the blocking queue in the thread pool is full, a thread is created and started to execute the newly submitted task.
- If workerCount >= maximumPoolSize and the blocking queue in the thread pool is full, the task is processed according to the reject policy. The default is to throw an exception directly.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//workerCountOf gets the current number of threads in the thread pool; < corePoolSize, addWorker creates a new thread to execute the command task
if (addWorker(command, true))
return;
c = ctl.get();
}
//step 2
// The thread pool is in the RUNNING state, and the submitted task is successfully placed in the blocking queue
if (isRunning(c) && workQueue.offer(command)) {
//double check
int recheck = ctl.get();
If the pool is not RUNNING and the task is successfully removed from the blocking queue, execute the reject method to process the task
if (! isRunning(recheck) && remove(command))
reject(command);
// If the thread pool is in the running state, but there are no threads, a thread is created
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
//step 3
// Fail to create a new thread into the pool, reject task
else if(! addWorker(command,false))
reject(command);
}
Copy the code
Task buffer
The task buffer module is the core part of the thread pool that can manage tasks. The essence of thread pool is the management of tasks and threads, and the key idea to achieve this is to decouple the tasks and threads from the direct correlation, so that the subsequent allocation work can be done. Thread pools are implemented in producer-consumer mode through a blocking queue. The blocking queue caches tasks from which the worker thread retrieves them. A BlockingQueue is a queue that supports two additional operations. The two additional operations are: when the queue is empty, the thread that fetched the element waits for the queue to become non-empty. When the queue is full, the thread that stores the element waits for the queue to become available. Blocking queues are often used in producer and consumer scenarios, where the producer is the thread that adds elements to the queue and the consumer is the thread that takes elements from the queue. A blocking queue is a container in which producers hold elements, and consumers only take elements from the container. Commonly used blocking queues
The task for
private Runnable getTask(a) {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// Whether the thread pool has stopped
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// Block fetch or timed fetch
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
Here’s a bit of code that makes keepAliveTime work:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
Copy the code
AllowCoreThreadTimeOut is false. Threads are not destroyed even if they are idle. If true, it is destroyed if it is idle during keepAliveTime. Timed == false if a thread allows idle wait without being destroyed, the timed == false workqueue. take task: if the blocking queue is empty, the current thread is suspended and waiting; When a task is added to the queue, the thread is woken up and the take method returns the task and executes it. If the thread does not allow endless idle timed == true, the workqueue. poll task: if there is no work on the blocked queue within keepAliveTime, null is returned;
Task rejection strategy
The task rejection module is the protected part of the thread pool. The thread pool has a maximum capacity. When the task cache queue of the thread pool is full and the number of threads in the thread pool reaches maximumPoolSize, the task must be rejected and the task rejection policy is adopted to protect the thread pool.
Policy interface:
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
Copy the code
There are four existing rejection strategies
Worker thread Management
Worker
The Worker thread implements the Runnable interface and holds a thread, thread, that initializes the task firstTask. Threads are threads created from ThreadFactory when the constructor is called and can be used to perform tasks. FirstTask uses it to hold the first incoming task, which can be null or null. If this value is non-empty, the thread will execute the task immediately after startup, which is the case when the core thread is created. If the value is null, a thread needs to be created to perform the tasks in the workQueue, that is, the creation of non-core threads.
Thread pools need to manage the life cycle of threads and need to be recycled when threads are not running for a long time. Thread pools use a Hash table to hold references to threads, which can control the thread’s life cycle by adding and removing references. What matters is how to tell if the thread is running.
Worker inherits AQS and uses AQS to realize the function of exclusive lock. Instead of ReentrantLock, AQS is used to reflect the thread’s current execution state.
1. Once the lock method obtains an exclusive lock, it indicates that the current thread is executing a task. 2. If a task is being executed, the thread should not be interrupted. 3. If the thread is not in the exclusive lock state, that is, in the idle state, it is not processing tasks. In this case, you can interrupt the thread. 4. The thread pool calls interruptIdleWorkers to interrupt idle threads when the shutdown or tryTerminate methods are executed. The interruptIdleWorkers method uses tryLock to determine whether threads in the thread pool are idle. If the thread is idle, it can be safely reclaimed.
Add thread addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas updates the number of thread pools
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
// Thread pool reentrant lock
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//add work thread to a hashSet
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
if (workerAdded) {
// Start the thread to execute the task
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
Worker thread recycling
The thread pool’s job is to maintain a certain number of thread references based on the current state of the thread pool and prevent these threads from being reclaimed by the JVM. When the thread pool decides which threads need to be reclaimed, it simply removes the references. After Worker is created, it will poll continuously and then acquire tasks for execution. Core threads can wait indefinitely to acquire tasks, while non-core threads have to acquire tasks within a limited time. When the Worker fails to obtain the task, that is, the acquired task is empty, the loop will end and the Worker will actively eliminate its own reference in the thread pool.
try {
while(task ! =null|| (task = getTask()) ! =null) {
// Execute the task}}finally {
processWorkerExit(w, completedAbruptly);// If you can't get the task, take the initiative to reclaim yourself
}
Copy the code
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0&&! workQueue.isEmpty()) min =1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null.false); }}Copy the code
Execute task runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
Extra knowledge
Thread group
Threadgroups have been created to facilitate thread management. You can set the properties of thread groups, such as setDaemons, the handling of unhandled exceptions, and a common security policy. Thread groups are also available for easy access to thread information. Each ThreadGroup can contain a set of child threads and a set of child thread groups, which exist as a tree within a process. The root ThreadGroup is usually the system ThreadGroup. Under the system thread group is the main thread group. By default, the primary application’s own thread group is created from the main thread group.