preface
Overview of Thread pools
Thread pooling is a pattern used by threads. It manages the relationship between tasks and threads through a monitoring manager, avoiding the cost of frequent creation and destruction of threads. At the same time, it also limits the number of threads and tasks, avoiding the overload of the system because of memory overflow, thread switching, and too many tasks.
Tasks are usually scheduled using synchronous queues, where the main thread puts the task into the queue and other worker threads get the task from the queue to execute. If there are no tasks to execute, the worker thread will hang.
Thread pool uses a producer-consumer pattern, which is divided into three layers: synchronous layer, queue layer and asynchronous layer. The main thread of the synchronous layer is responsible for processing work tasks and putting them into the synchronous queue of the queue layer, while the worker thread of the asynchronous layer is responsible for getting the work from the synchronous queue of the queue layer for processing. If the task list is empty, the worker thread enters the suspended state.
Thread pool limitations
Thread pools are not without their drawbacks. Only by setting thread pool parameters properly can we avoid wasting too many resources while improving performance.
- If you create too many threads, some resources will be wasted, and some threads will be underutilized
- If you destroy too many threads, you will waste time creating them again
- If you create too few threads, you may end up with too many tasks piling up and the task processing time increasing.
architecture
The overall structure of a thread pool is not complicated, and the main interfaces are Executor and ExecutorService.
The Executor interface defines and is the only method used to submit tasks to a thread pool. This method decouples task submission and task execution and shields the details of thread scheduling and task execution so that users only need to focus on how to submit tasks.
The ExecutorService interface extends the functionality of thread pools :(1) it provides additional methods for submitting tasks that can return one or a number of Future objects that can return values. (2) provides an interface to stop thread pool operation.
AbstractExecutorService AbstractExecutorService is an abstract class that provides a basic execution process. The lower class only needs to pay attention to the execution process of a task without paying attention to the relationship between tasks.
The concrete implementation class of ThreadPoolExecutor, the most complex part, maintains the thread pool lifecycle, the storage of tasks, the scheduling of tasks, and so on. And that’s the class we focus on.
The core variables
ctl
CTL is a variable of type AtomicInteger that stores the state of the thread pool and the number of worker threads. The top three bits store the state of the thread pool (runState) and the bottom 29 bits store the number of worker threads (workerCount).
Thread pool life cycle:
- RUNNING: Indicates that the thread pool is RUNNING and can accept and process tasks. Binary 111, decimal -1
- SHUTDOWN: the thread pool does not accept new tasks but can still process tasks in the queue. Binary 000, decimal 0
- STOP: indicates that the thread pool does not accept new tasks or process tasks in the queue, and interrupts the thread executing tasks. Binary 001, decimal 1
- TIDYING: Indicates that all tasks have terminated and the number of worker threads is 0. Binary 010, decimal 2
- TERMINATED: Indicates that the thread pool is TERMINATED. Binary 011, decimal 3
workQueue
WorkQueue is a variable of type BlockingQueue that stores tasks that cannot be executed temporarily. It is a synchronous queue in which only one thread at a time can fetch tasks from the queue. If there are no tasks available in the queue, the thread will be blocked and suspended waiting for notification.
workers
Workers is a variable of type HashSet that stores all the threads currently available in the thread pool. The container also holds a reference to the thread, preventing it from being garbage collected. If a thread is no longer needed, it is simply removed from the container, and the thread is collected by the garbage collector.
threadFactory
ThreadFactory is a variable of type threadFactory used to create a factory for a thread. There is a default thread pool factory in the thread pool, and we can also customize how to generate a thread by implementing the ThreadFactory interface.
handler
Handler is a variable of type RejectedExecutionHandler that defines the response to reject a task. When the task queue capacity is full or the thread pool is not in the Running state, a defined rejection policy is used to reject the task. There are four default rejection policies implemented in the thread pool. You can also customize the RejectedExecutionHandler interface to reject a task.
Preset four strategies:
CallerRunsPolicy
: Uses the current thread to execute the rejected task directly.AbortPolicy
: Throws an exceptionDiscardPolicy
: Discards the task.DiscardOldestPolicy
: Discards the oldest task in the queue and adds the rejected task to the task queue.
keepAliveTime
KeepAliveTime is a variable of type long that defines how long a non-core idle thread must wait for a new task. If no new task is assigned to the thread after this time, the thread will be reclaimed. If this parameter is not used, the thread will wait forever for a new task.
allowCoreThreadTimeOut
AllowCoreThreadTimeOut is a Boolean variable that enables timeout collection when the core thread is idle. If true, idle core threads are reclaimed after the keepAliveTime limit is exceeded. If false, it will never be recycled until the thread pool stops.
corePoolSize
CorePoolSize is a variable of type int that represents the preset number of core threads that need to be specified when creating the thread pool.
maximumPoolSize
MaximumPoolSize maximumPoolSize is a variable of type int that specifies the default number of threads. CorePoolSize is the number of core threads and the number of core + non-core threads.
defaultHandler
The default reject policy in the thread pool is AbortPolicy, that is, throw an exception.
Task execution mechanism
Task submission
The submission of tasks in the thread pool is decoupled from the execution of tasks. Users only need to care about the submission of tasks, and the execution of specific tasks will be determined by the thread pool itself: directly create threads to execute tasks, put them into a blocking queue, and reject tasks.
The task submission is accomplished by the execute method, as shown in the following figure:
- If the RUNNING state of the thread pool is not RUNNING, the task will be rejected.
- If the thread pool is in the RUNNING state, but the core thread reaches the default value, the task will be put into the blocking queue
- If the blocking queue is full, a non-core thread will be created to execute the submitted task.
- If the blocking queue is full and the number of threads reaches the preset maximum, the task will be rejected.
Task buffering
Task buffering is achieved by blocking the queue. There are two ways for a thread to obtain a task :(1) the first task passed in when the thread is created. (2) Get tasks from the blocking queue.
When a thread retrieves a task from a blocking queue, if the queue is empty, the thread will be suspended until new tasks become available. A blocking queue is a synchronous queue that uses an exclusive lock to ensure that only one thread can obtain a task from the queue at a time, preventing different threads from obtaining the same task.
Java provides a variety of blocking queues to choose from, each with its own characteristics.
The name of the | describe |
---|---|
ArrayBlockingQueue | A first-in, first-out bounded blocking queue implemented using arrays. Fair and unfair locks are supported |
LinkedBlockingQueue | A first-in, first-out bounded blocking queue implemented using linked lists. The default capacity isInterge.MAX_VALUE Compared withArrayBlockingQueue It has higher throughput, but it loses the random storage feature. |
LinkedBlockingDeque | A bounded blocking queue with bidirectional access using linked lists. At high concurrency, compared toLinkedBlockingQueue Can reduce lock contention by up to half |
PriorityBlockingQueue | An unbounded blocking queue that provides priority sorting. If no specific sorting method is provided, it will be sorted using natural sorting and will throwOOM The exception. |
SynchronousQueue | A synchronous queue that does not store tasks. Each insert must wait for a corresponding delete from another thread. Fair and unfair locks are supported |
LinkedTransferQueue | An unbounded blocking queue implemented using linked lists. |
DelayQueue | An unbounded delay queue that sets how long each element must wait to be removed from the queue. Internally, the leader pattern is used to manage elements. |
Application for assignment
There are two ways for a thread to get a task, one is the first task assigned when the thread is created, and the other is to get a task from a blocking queue. After the thread has finished executing the task, it blocks the queue to rewrite the fetch task, using a while loop.
When a thread is created and started, the runWorker method is executed, which retrieves the task to be executed through a while loop. When it retrieves the task, the task’s run method is called to execute the fetched task. When the task is finished, it enters the while loop again until the thread pool is stopped, the thread is blocked, or the thread needs to be reclaimed.
The request for the task is done through the getTask method, which is executed as follows:
When the thread fetching task returns NULL, the thread pool reclaims the thread.
The thread pool is not RUNNING. If allowCoreThreadTimeOut is set, then the core thread is also recyclable. If the recyclable thread does not get the executable task within the specified time, then it will be recycled in the next cycle.
Rejection of a task
The task is rejected to keep the thread pool safe, and if the blocking queue is full and the number of threads reaches the preset maximumPoolSize, the task is rejected outright.
The RejectedExecutionHandler is used to implement the RejectedExecutionHandler interface. You can implement this interface to customize the RejectedExecutionHandler for a task.
There are also four alternative task rejection policies available within the thread pool. The default task rejection policy is AbortPolicy, which throws an exception when a task is rejected.
The name of the | describe |
---|---|
CallerRunsPolicy | Used to performexecute Method executes the rejected task, and if the thread pool has been closed, the task is discarded |
AbortPolicy | Default reject policy, throw exception directly |
DiscardPolicy | Do nothing, just drop the task, no record or prompt |
DiscardOldestPolicy | Discard the oldest task in the task queue and resubmit it |
Thread management mechanism
In the thread pool, a Worker internal class is designed to manage and monitor the state of threads.
The Worker class inherits from AQS and implements Runnable interface. By design, the object of this class is both a lock and a task that holds threads.
Create a thread
The method for creating a thread is addWorker(Runnable firstTask, Boolean core), where firstTask indicates whether there is a firstTask that needs to be executed immediately, and core indicates whether the thread being created is a core thread.
The flow of this method:
- When adding a thread, first determine the running state of the thread pool.
- If the status is STOP, check whether the status is SHUTDOWN or STOP. The difference between these two states is that the former continues to execute the task in the task queue, while the latter discards the task in the task queue and interrupts the thread executing the task. If SHUTDOWN and a task exists in the task queue, threads are allowed to be created to speed up the execution of the task. Note that only threads are allowed to be created, but not necessarily successfully.
- If it is running, it is also allowed to create new threads.
- In the case that threads are allowed to be created, you need to determine whether the current number of threads matches the default value. First, the number of threads must be less than the maximum capacity
- If it is a core thread, the current number of threads must be less than the set number of core threads
- If there are non-core threads, the current number of threads must be less than the set maximum number of threads
- When both steps are complete, you can create a new thread. Instead of starting the thread immediately after it is created, the state of the thread pool is checked again.
- If the state is RUNNING or SHUTDOWN, the thread is cached and started.
- Otherwise, the thread is removed.
Caching threads
After the thread is created, it needs to be strongly referenced so that the garbage collector cannot reclaim it because threads are reusable and not destroyed once a task has been executed.
The thread pool holds all valid threads in a HashSet container, and if you need to reclaim a thread, you simply remove it from the container. References to threads are removed, and the garbage collector reclaims them whenever it finds that none exist.
Recovery of the thread
Threads in the thread pool are not permanent, so the thread pool needs to recycle unused threads. We learned from the previous section that threads can be reclaimed simply by dereferencing them, but how does a thread pool determine which threads need to be reclaimed?
A null is returned when the thread retrieves the task, indicating that the thread needs to be reclaimed.
// runWorker()
try {
while(task ! =null|| (task = getTask()) ! =null) {
/ /...}}finally {
processWorkerExit(w, completedAbruptly);
}
Copy the code
Once the thread is started, the method is executed, iterating through the getTask() method to get the task from the task queue. There are three things that happen when you get a task:
- Obtaining a task successfully
- Blocked and suspended. No task exists in the task queue or another thread is fetching the task.
- Returns null
When null is returned, the while loop is broken out, and the processWorkerExit method is executed, which causes the thread to lose the reference and be collected by the garbage collector.
Interrupt threads
In Java’s threading model, there are no real interrupts. Instead, an interrupt flag bit is set and the user decides how to respond to the change of the interrupt flag bit.
When we execute the shutdowmNow method, we change the thread pool state to STOP, interrupt all threads, both idle and running, and clear the task queue. Finally, an attempt is made to change the thread pool state to TERMINATED.
Dynamic setting parameters
The values of variables in the thread pool are not fixed after initialization. The thread pool provides an interface for the thread pool to modify some of its parameters at run time. Here are just two of the more important ones.
setCorePoolSize
This method dynamically sets the number of core threads. The thread pool adjusts the number of threads in the pool based on the current state of the thread and the value passed in by this method.
If the number of new core threads is greater than the number of old core threads, the thread pool does not directly add the number of threads corresponding to the difference. Instead, the number of core threads that need to be created is determined based on the difference and the number of blocked tasks. If there are no tasks in the blocking queue that need to be executed, no new threads will be created ahead of time. Each time a thread is created, it checks if there are any more tasks in the blocking queue, and if there are none, the thread is not created any more.
setMaximumPoolSize
This method dynamically sets the maximum number of threads in the thread pool.
The end of the
Understanding how thread pools work is an essential step in mastering concurrent programming, because thread pools are the only way to improve concurrency. To use the thread pool properly, the parameters set in the thread pool must be understood, so that different parameters can be set according to the actual situation of different businesses.
reference
- Implementation principle of Java thread pool and its practice in Meituan business