This is the fourth day of my participation in the August More text Challenge. For details, see: August More Text Challenge
Why use thread pools?
- Creating/destroying threads is associated with system overhead, and too frequent creation/destroying threads will greatly affect the efficiency of processing.
- Threads themselves occupy Memory space. A large number of threads preempt precious Memory resources. If not handled properly, an Out of Memory exception may occur. Even if not, a large number of thread recollections can put a lot of pressure on the GC and prolong GC pause times.
Therefore, when threads are used in a production environment, they must be controlled and managed.
What is a thread pool?
To prevent the system from constantly creating and destroying threads, we can reuse the created threads.
Pooling techniques are also common, such as database connection pooling, which can be used to maintain some database connections in order to avoid having to re-establish and destroy database connections every time a database operation is performed. When the system needs to use it, instead of creating a new connection, it simply gets one available from the connection pool. Conversely, when you need to close a connection, you don’t actually close the connection, you just add it to the connection pool.
In a thread pool, there are always several active threads. When you need to use a thread, you can take a free thread from the pool, and when you’re done, you can return it to the pool so that someone else can use it.
JDK support for thread pools
In Java, create a new thread pool object is very simple, Java itself provides a Java utility class. Util. Concurrent. The Executors.
Executors provide various types of thread pools. The following methods include:
// This method returns a pool with a fixed number of threads
// When a new task is submitted, if there are any idle threads in the thread pool, it will be executed immediately, otherwise it will be added to the Task queue (LinkedBlockingQueue)
public static ExecutorService newFixedThreadPool(int nThreads)
// This method returns a pool of only one thread
// If more than one thread is submitted to the thread pool, add the task to the task queue (LinkedBlockingQueue)
public static ExecutorService newSingleThreadExecutor(a)
// This method returns a dynamically adjusted thread pool. The number of threads in the pool is not fixed, and idle threads are reused.
// Otherwise create a new thread to execute the task
public static ExecutorService newCachedThreadPool(a)
// This method returns a ScheduledExecutorService object with a thread pool size of 1
public static ScheduledExecutorService newSingleThreadScheduledExecutor(a)
// This method returns a ScheduledExecutorService object that can specify the thread pool size
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
// Work steal thread pool since 1.8
// Return A pool of threads that can steal tasks, assuming A and B threads, if A has completed its task
// It will actively steal B's task execution
public static ExecutorService newWorkStealingPool(a)
Copy the code
While there are plenty of factory methods available in the JDK, in a production scenario, it is more helpful to create a thread pool by specifying the thread pool’s parameters as needed to avoid the risk of resource exhaustion. So emphasized in ali development specification, Executors are not allowed to be used to create a thread pool, but ThreadPoolExecutor ways.
Thread pool core implementation
The core Java thread pool class is ThreadPoolExecutor, whose UML class diagram is shown below.
ThreadPoolExecutor
The top-level interface of the implementation isExecutor
, top-level interfaceExecutor
Provides an idea to decouple task submission from task execution. The user doesn’t have to worry about how to create threads and how to schedule threads to perform tasksRunnable
Object that commits the execution logic of the task to the executor (Executor
By)Executor
The framework does the threading and execution part of the task.ExecutorService
The interface adds some capabilities :(1) expands the ability to execute tasks, and supplements can be generated for one or a batch of asynchronous tasksFuture
The method; (2) Provides methods to control the thread pool, such as stopping the thread pool running.AbstractExecutorService
Is the abstract class of the upper layer, which connects the process of executing the task, ensuring that the implementation of the lower layer only needs to focus on one method of executing the task. The lowest level implementation classThreadPoolExecutor
Implementing the most complex part of the operation,ThreadPoolExecutor
It maintains its own lifecycle while managing threads and tasks at the same time, making a good combination of the two to execute parallel tasks.
The constructor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Copy the code
Parameter Meaning:
- CorePoolSize: Specifies the number of threads in the thread pool.
- MaximumPoolSize: Specifies the maximum number of threads in the thread pool.
- KeepAliveTime: When the number of threads in the thread pool exceeds the corePoolSize, the lifetime of the extra idle threads. That is, how long before an idle thread exceeds the corePoolSize is destroyed.
- Unit: Indicates the unit of keepAliveTime.
- WorkQueue: Queue of tasks that have been submitted but not yet executed.
- ThreadFactory: a threadFactory used to create threads.
- Handler: rejects the policy. How to turn down tasks when they are too many to handle.
The creation of a thread pool can be very powerful with many parameters. However, you need to be careful when creating it, and a few parameters are particularly important to choose, which can directly affect performance. It is also worth noting that ThreadPoolExecutor provides get and set methods for each of the seven parameters, supports dynamic modification of the configuration and provides monitoring mechanism for obtaining parameter information, thus enabling a dynamic thread configuration management platform.
Thread pool task scheduling logic
The thread pool internally builds a producer-consumer model that decouples threads and tasks, not directly related, to buffer tasks and reuse threads.
The operation of thread pool is divided into two parts: task management and thread management. The task management part acts as the producer, and when the task is submitted, the thread pool determines the subsequent flow of the task:
- Direct request thread to perform the task;
- Buffered to a queue for execution by threads;
- Refuse the task. The thread management part is the consumer, which is maintained uniformly in the thread pool, and the thread is allocated according to the task request. When the thread finishes executing the task, it will continue to obtain new tasks to execute. Finally, when the thread fails to obtain the task, the thread will be reclaimed.
The lifetime of the thread pool
The running state of the thread pool is not explicitly set by the user, but is maintained internally as the thread pool runs. A variable is used internally to maintain two values: the running state (runState) and the number of threads (workerCount). In its implementation, thread pools combine maintenance of two key parameters, runState and workerCount, as shown in the following code:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy the code
CTL is an AtomicInteger field that controls the running status of the thread pool and the number of valid threads in the pool. The runState of the thread pool and the number of valid threads in the pool (workerCount) are stored. The higher 3 bits hold the runState and the lower 29 bits hold the workerCount. Using one variable to store two values can avoid inconsistency when making decisions, and it is not necessary to occupy lock resources to maintain consistency between the two. As you can see from reading the thread pool source code, it is often the case that you have to determine both the thread pool’s health and the number of threads. The thread pool also provides several methods for users to obtain the current status of the thread pool and the number of threads. It’s all bit operations, and it’s much faster than the basic operations.
With respect to the internal encapsulation of the get lifecycle state, the number of get thread pool threads is calculated as follows:
// Calculate the current running state
private static int runStateOf(int c) { return c & ~CAPACITY; }
// Count the current number of threads
private static int workerCountOf(int c) { return c & CAPACITY; }
// Generate a CTL by state and thread count
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code
There are five running states of a ThreadPoolExecutor:
Running state | State description |
---|---|
RUNNING | Can accept new submitted tasks and can also process tasks in blocking queues |
SHUTDOWN | Closed state, no longer accepting new submitted tasks, but can continue processing saved tasks in the blocking queue |
STOP | New tasks cannot be accepted, tasks in the queue are not processed, and threads that are processing tasks are interrupted |
TIDYING | All tasks have been terminated and the workerCount (valid threads) is 0 |
TERMINATED | Enters the state after the terminated() method finishes executing |
The lifecycle transformation is shown in the following figure:
The source code to explore
Be prepared for the possibility that the following may cause you discomfort.
Task scheduling: execute(Runnable Command)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
else if(! addWorker(command,false))
reject(command);
Copy the code
All tasks are scheduled by the execute method, which checks the status of the current thread pool, the number of threads running, and the execution policy, and determines whether the next process to execute is to request the thread to execute directly, buffer it to the queue, or reject the task directly. The execution process is as follows:
- If the thread pool is not RUNNING, the thread pool is rejected. The thread pool must execute tasks in the RUNNING state.
- If workerCount < corePoolSize, a thread is created and started to execute the newly submitted task.
- If workerCount >= corePoolSize and the blocking queue in the thread pool is not full, the task is added to the blocking queue.
- If workerCount >= corePoolSize && workerCount < maximumPoolSize and the blocking queue in the thread pool is full, a thread is created and started to execute the newly submitted task.
- If workerCount >= maximumPoolSize and the blocking queue in the thread pool is full, the task is processed according to the reject policy, and the default is to throw an exception directly.
Task application: getTask()
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/** * The key value here is timed, if allowCoreThreadTimeOut=true or if the worker thread is larger than corePoolSize, * timed=true and poll() is called to fetch the task from the blocked queue. Otherwise call the take() method * to get the task. * * poll(long Timeout, TimeUnit Unit) : retrieve a task from BlockingQueue. If not immediately, * wait for the timeout argument. If not, return null; * take() : take a task from a blocking queue. If BlockingQueue is empty, block it from entering the blocking queue until * BlockingQueue has a new task to join. * * /
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}Copy the code
Inner class: Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread; //Worker holds thread Runnable firstTask; // The initialized task can be null}Copy the code
The Worker implements the Runnable interface and holds a thread, “thread”, and an initialized task, “firstTask”. A thread is a thread created with a ThreadFactory when a constructor is called and can be used to perform a task. FirstTask uses it to hold the firstTask passed in, which can either be null. If this value is non-null, the thread executes the task immediately after startup, as it did when the core thread was created. If this value is null, then a thread will be created to execute the tasks in the workQueue, i.e. the creation of non-core threads.
Thread pools need to manage the life cycle of threads and need to be recycled when threads are not running for a long time. The thread pool uses a Hash table to hold threads’ references, which can control the lifetime of threads by adding and removing references. What matters is how to tell if the thread is running.
Worker inherits AQS and uses AQS to realize exclusive lock. Instead of using ReentrantLock, AQS is used in order to implement non-reentrant features that reflect the current execution state of the thread.
1. The lock method acquies an exclusive lock, indicating that the current thread is executing a task.
2. If the task is executing, the thread should not be interrupted.
3. If the thread is not in the state of exclusive lock, that is, idle, it is not processing tasks, then you can interrupt the thread.
4. When the thread pool executes the shutdown or tryTerminate methods, it calls the interruptIdleWorkers method to interrupt idle threads. The interruptIdleWorkers method uses the tryLock method to determine whether the threads in the thread pool are idle. It can be safely reclaimed if the thread is idle.
AddWorker thread: addWorker()
private boolean addWorker(Runnable firstTask, boolean core)
Copy the code
The firstTask parameter specifies the firstTask to be executed by the new thread. This parameter can be null. If the core parameter is true, the number of active threads is determined to be less than corePoolSize before adding new threads. If the value is false, the number of active threads is determined to be less than maximumPoolSize before adding new threads.
// The outer loop that determines the state of the thread pool
// The loop is followed by a retry, marking the position of the loop. We can add a retry to the loop after continue or break, where break means to skip the loop for the flag, and continue means to continue from the loop for the flag.
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
// The inner loop is responsible for the number of workers +1
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}// Worker Number +1 Successful subsequent operations
// Add to the workers Set Set and start the worker thread.Copy the code
Worker thread execution: runWorker()
The run method in the Worker class calls the runWorker method to execute the task. The runWorker method executes as follows:
- The while loop keeps getting the task through the getTask() method.
- The getTask() method gets the task from the blocking queue.
- If the thread pool is stopping, ensure that the current thread is in an interrupted state, otherwise ensure that it is not.
- Perform the task.
- If the getTask result is null, break out of the loop and execute the processWorkerExit() method, destroying the thread.
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
/ /... slightly
task.run();
/ /... slightly
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Destroy the thread
processWorkerExit(w, completedAbruptly);
}
Copy the code