Thread creation requires the allocation of thread-private memory space, such as virtual machine stacks, local method stacks, program counters, and so on. These system resources need to be reclaimed when the thread is destroyed. The frequent creation and destruction of threads wastes a lot of resources. Using thread pools can better manage and coordinate the work of threads.
Benefits of thread pools
- Reduces resource consumption by reusing existing threads to reduce thread creation and destruction costs
- Improved response speed, task arrival does not have to wait for thread creation
- Manage reuse threads and limit the maximum number of concurrent requests
- Implementation timing to perform tasks or cycle (ScheduledThreadPoolExecutor)
- Isolate the threaded environment to prevent service avalanches by preventing different service threads from interacting with each other (this is also done in hystrix of SpringCloud, where different service invocations use different thread pools)
Use of thread pools
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if(workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); ..................... }Copy the code
- CorePoolSize: Indicates the number of resident core threads. If the task is 0, it will be destroyed if no request comes in. If it is greater than 0, it will not be destroyed.
- MaximumPoolSize: indicates the maximum number of concurrent threads in the thread pool. The value must be greater than or equal to 1. If it is equal to corePoolSize, it is a fixed size thread pool. If the number of threads to execute is greater than this number, it is handled by handler.
- KeepAliveTime: Indicates the idle time of threads in the thread pool. When this value is reached, threads are destroyed until corePoolSize is left. By default, this works when the number of threads is greater than corePoolSize, but if allowCoreThreadTimeOut of ThreadPoolExecutor is set to true, the core thread will also be destroyed when it times out.
- Unit: keepAliveTime Time unit
- WorkQueue: Represents the cache queue. If the number of requesting threads is greater than corePoolSize, the thread will enter BlockingQueue.
- ThreadFactory: a threadFactory that produces a set of threads for the same task. Thread pool naming is implemented by adding a group name prefix to the factory to make it easy to know which thread project is responsible for thread tasks during vm stack analysis.
- Handler: Object that executes the rejection policy. When the workQueue is full and the number of active threads is greater than maximumPoolSize, the thread pool processes requests through this policy.
2) Rejection policy analysis: Four RejectedExecutionHandler policies are provided in ThreadPoolExecutor.
- AbortPolicy (default) : discard task and throw RejectedExecutionException anomalies.
- DiscardPolicy: Discards the current task.
- DiscardOldestPolicy: Discards the task that has been waiting the longest and adds the current task to the queue.
- CallerRunsPolicy: Calls the run() method of the task to bypass the thread pool and execute it directly.
3) other way to create a thread pool (not recommended) : Executors the thread pool static factory can create three packaging object: thread pool ForkJoinPool, ThreadPoolExecutor, ScheduledThreadPoolExecutor. The core methods for ThreadPoolExecutor in Executors are as follows:
SynchronousQueue is a blocking queue that stores no elements, maximumPoolSize intege. MAX_VALUE is unbounded, and threads are created when the main thread submits tasks faster than CachedThreadPool can process them. In extreme cases, OOM public static ExecutorService occursnewCachedThreadPool() {
returnnew ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); // keepAliveTime 0 means that extra threads are terminated immediately. The default size of LinkedBlockingQueue is integer. MAX_VALUE. OOM public Static ExecutorService newFixedThreadPool(int nThreads) {returnnew ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // The default size of LinkedBlockingQueue is Integer.MAX_VALUE, which is unbounded, and OOM public static ExecutorService in extreme casesnewSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
Copy the code
After looking at the above methods, we can see that the thread pools constructed by all three methods have OOM risk. In addition, you cannot flexibly configure thread factories and rejection policies. Therefore, do not use Executors to create a thread pool.
4) Submit tasks to the thread pool: There are two methods execute() and submit() to submit tasks to the thread pool. The execute() method is used to submit tasks that do not require a return value and cannot determine whether the task was successfully executed by the thread pool. The submit() method is used to submit a task (Callable) with a return value. The thread pool returns an object of type Future, and the return value is retrieved through the Future’s get() method, which notably blocks the current thread until the task is complete.
5) shutdown the thread pool: there are two methods to shutdown the thread pool: shutdown() and shutdownNow(). They work by iterating through worker threads in a thread pool and then interrupting them one by one by calling the threadthreadinterrupt () method. (Threads that cannot respond to interrupts cannot be terminated.) The difference is that shutdownNow() first sets the thread pool state to STOP and then tries to STOP all threads; Shutdown () sets the thread pool state to SHOTDOWN and then shoots all threads that are not executing tasks.
Thread pool principle analysis
When the thread pool receives a task, the process is shown below:
- Judge whether the number of current worker threads reaches the number of core threads, if not, create a new thread to execute the task, if so, the next judgment.
- Check whether the work queue is full. If the work queue is not full, add the task to the work queue; otherwise, proceed to the next check.
- Determine if the thread pool is full. If not, create a new thread to execute the task. Otherwise, process the task according to the saturation policy.
ThreadPoolExecutor execution diagram:
Here is the core code for the execute() method in ThreadPoolExecutor:
public void execute(Runnable command) {
if (command== null) throw new NullPointerException(); // Gets an object that returns the number of threads and the state of the thread poolintegerInt c = ctl.get(); If the number of worker threads is smaller than the number of core threads, create a task and execute itif (workerCountOf(c) < corePoolSize) {
if (addWorker(command.true))
return; c = ctl.get(); } // if the thread pool is in the RUNNING state, the task will be queuedif (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); } // 3, the core thread pool and queue are full, try to create a new threadelse if(! addWorker(command.falseReject (reject());command);
}
Copy the code
AddWorker () basically creates Worker threads — wrapping tasks as Worker classes. ReentrantLock is required to prevent interference during thread creation in steps 1 and 3. When the current number of worker threads is greater than or equal to corePoolSize, almost all execute() is performed in Step 2. After the Worker completes the task, it circulates to fetch tasks in the work queue to execute while (task! = null || (task = getTask()) ! GetTask () gets the task in the blocking queue (poll() or take(), poll() timeout if the core thread will be destroyed or the number of current threads is greater than the number of core threads)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
: workQueue.take();
Copy the code
The code for how a thread pool works is not analyzed here. The following diagram shows how a thread pool works.
Configure thread pools properly
To properly configure thread pools, you first need to analyze the characteristics of tasks: CPU-intensive tasks, IO intensive tasks, and hybrid tasks.
-
Cpu-intensive tasks: Use as small a thread pool as possible, typically number of CPU cores +1. Cpu-intensive tasks have high CPU utilization, and running too many threads only increases the number of context switches, thus incurs additional overhead.
-
IO intensive tasks: Use a slightly larger thread pool, typically 2*CPU cores. IO – intensive tasks have low CPU usage. Therefore, you can use the CPU to process other tasks while waiting for I/OS.
-
Hybrid tasks: You can divide tasks into IO – and CPU-intensive tasks and use separate thread pools to process them. As long as the execution time of the two tasks after splitting is not significantly different, it will be more efficient than serial execution. If the execution time of the two tasks is significantly different after the split, the final time still depends on the completed tasks, plus the cost of splitting and merging tasks.
There is also a lot of knowledge involved in thread pool implementation, such as BlockingQueue, ReentrantLock, Condition, etc., which I won’t cover here, but will cover later.