1. Why do we need thread pools

  1. Returns the problem caused by creating the thread
  • Creating a thread costs the system. It takes time to create and destroy the thread, which harms the stability of the system.
  • Too many threads occupy memory resources and cause context switching, resulting in system instability.

2. In order to solve the above problems, the thread pool is also introduced by using thread traversal

  • The core idea of thread pool is to decouple tasks from threads. A fixed or certain number of threads are reused in the thread pool, and then only accept tasks and perform related tasks.
  • Threads in the thread pool keep running and perform tasks repeatedly; At the same time, create threads as needed to reduce memory footprint.

1.1. Convenience brought by thread pools

  • First, thread pools can address the overhead of the thread life cycle while also speeding up response times. Because threads in the thread pool are reusable, we use only a few threads to perform a large number of tasks, which greatly reduces the overhead of the thread life cycle. Threads are usually not created AD hoc, but are already created and ready to execute tasks, which eliminates thread creation delays, improves response times, and enhances the user experience.
  • Second, thread pools can coordinate memory and CPU usage to avoid resource misuse. The thread pool flexibly controls the number of threads based on the configuration and number of tasks, creating when there are not enough threads and recycling when there are too many, avoiding memory overflow due to too many threads or WASTING CPU resources due to too few threads. It strikes a perfect balance.
  • Third, thread pools can centrally manage resources. For example, thread pool can unify the management of task queue and thread, and can uniformly start or end tasks, which is more convenient and easier to manage than a single thread to process tasks one by one. At the same time, it is also conducive to data statistics, for example, we can easily count the number of tasks that have been executed.

2. How to use thread pools

Internal structure of the thread pool

The thread pool consists of four departments: 1. The thread pool manager is responsible for creating, destroying, and adding tasks of threads 2. 3. Task queue, as a buffer mechanism, puts the current unprocessed tasks into the queue. In concurrent scenarios, thread safety must be ensured. 4, task, task requirements to achieve a unified interface, convenient working thread processing

2.1 Parameters of the thread pool

parameter meaning
corePoolSize Core threads
maximumPoolSize Maximum number of threads
keepAliveTime Length of time
TimeUnit Unit of time
workQueue The queue
handler Rejection policies

Follow the flowchart for thread pool execution

Graph of TD A task [submit] -- > | A1 | B {B} whether core thread with B - > whether | | D/create core thread B - > is | | E {whether the queue is full} E - > whether | | F [] to join the queue E - > is | | G {if the thread pool is full} G -- > is | | H {refuse strategy} G - > whether | | IH {create non-core thread}

As shown in the above, when task, thread pool first checks the current number of threads, if the number of threads is less than the core number of threads, such as the first thread number is 0, the new thread and perform the task, as the task of increasing, the number of threads will increase gradually and reach the core number of threads, at this time if there is still a task has been submitted, It is put into the workQueue and waits for the core thread to finish executing the current task and then retrieve the pending task from the workQueue.

If we have a task that is so special that we have reached the maximum capacity of the workQueue, then the thread pool will start the backup force, which is maximumPoolSize, and the thread pool will continue to create threads to execute the task based on the corePoolSize core thread number. If a task is submitted repeatedly, the thread pool will continue to create threads until the maximum number of threads reaches maximumPoolSize. If a task is still submitted, it will exceed the maximum capacity of the thread pool. At this point, the thread pool will reject the task. The thread pool determines corePoolSize, workQueue, and maximumPoolSize one by one and rejects the task if it still cannot meet the requirements. CorePoolSize and maximumPoolSize

CorePoolSize refers to the number of core threads. When the thread pool is initialized, the default number of threads is 0. When a new task is submitted, a new thread will be created to execute the task. After that, the number of threads is usually no smaller than corePoolSize because they are core threads and will not be destroyed even if there may be no future tasks to execute. As the number of tasks increases, after the task queue becomes full, the thread pool creates further new threads, up to maximumPoolSize, to handle the multi-task scenario. If there are idle threads in the future, threads larger than corePoolSize are properly reclaimed. So normally, the number of threads in the thread pool will be in the closed range between corePoolSize and maximumPoolSize.

The third parameter is keepAliveTime. When the number of threads in the thread pool exceeds the number of core threads, and there is no work to do, the thread pool checks the keepAliveTime of the thread. If the time exceeds the specified time, the thread pool checks the keepAliveTime of the thread. Threads that have nothing to do are destroyed to reduce memory usage and resource consumption. If there are more tasks later, the thread pool will also create threads according to the rules, so this is a scalable process, relatively flexible, we can also use the setKeepAliveTime method to dynamically change the keepAliveTime parameter value. ThreadFactory the fourth parameter is a ThreadFactory, which is essentially a ThreadFactory that produces threads to perform tasks. We can choose to use the default thread factory, create threads in the same thread group, and has the same priority, and is not a daemon thread, we can also choose their own custom thread factory, for easy to thread the custom name, different thread pool threads are usually based on specific business to customize different threads.

2.2 workQueue and Handler

2.2.1 Four rejection strategies for thread pools

Reject policy: When creating a thread pool, we specify the maximum thread, queue, and reject policy. So when does the rejection policy trigger? 1. When we call shutdown, the task submitted by the thread pool will be rejected even though the thread pool is closed. Rejected when the thread pool is no longer capable of executing and storing submitted threads.

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10,            10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), new 
ThreadPoolExecutor.AbortPolicy());


Copy the code
  • Four rejection strategies

  • The first refused to strategy is AbortPolicy, such rejection policies in refused to task, can throw a type directly for RejectedExecutionException RuntimeException, let you perceive task was rejected, Depending on the business logic, you can either retry or abandon the commit strategy.

  • The second rejection policy is DiscardPolicy. As its name describes, when a new task is submitted, it will be directly discarded without giving you any notice. Relatively speaking, there is a certain risk, because we do not know that the task will be discarded when we submit it, which may cause data loss.

  • DiscardOldestPolicy discards the queue with the longest lifetime if the thread pool is not closed. DiscardOldestPolicy discards the queue with the longest lifetime if the thread pool is not closed. This frees up space for newly committed tasks, but there is also a risk of data loss.

  • The fourth rejection policy is CallerRunsPolicy, which is relatively complete. When a new task is submitted, if the thread pool is not closed and cannot execute it, the task is handed over to the thread that submitted the task, that is, whoever submits the task is responsible for executing the task. There are two main advantages to this.

  • First, newly submitted tasks are not discarded, so there is no loss of business.

The second advantage is that because who submit task who will be responsible for performing the task, so submit task thread responsible for performing tasks, and perform the task is more time consuming, during this period, submit task thread is occupied, also won’t submit new tasks, slow down the speed of tasks submission, rather then a negative feedback. During this period, threads in the thread pool can also make full use of this period to execute some tasks, freeing up some space, which is equivalent to giving the thread pool a certain buffer period.

2.2.3 Thread pool buffer queue

Common thread pools and their corresponding queues

The thread pool The queue
FixThreadPool LinkedBlockingDeque
SingleThreadPool LinkedBlockingDeque
CacheThreadPool SynchronousQueue
ScheduledThreadPool DelayQueue
SingleScheduledThreadPool DelayQueue
  • LinkedBlockingQueue     

For FixedThreadPool and SingleThreadExector, the blocking queue they use is LinkedBlockingQueue with capacity integer.max_value, which can be considered unbounded. Since the FixedThreadPool thread pool has a fixed number of threads, there is no way to add a particularly large number of threads to process tasks, so you need a LinkedBlockingQueue that has no capacity limit to hold tasks. Note that since the task queue of the thread pool is never full, the thread pool will only create threads with the number of core threads, so the maximum number of threads at this point is meaningless to the thread pool because it will not trigger the generation of more threads than the number of core threads.

  • SynchronousQueue      

The second blocking queue is SynchronousQueue, and the corresponding thread pool is CachedThreadPool. The maximum number of threads in the CachedThreadPool is Integer. A CachedThreadPool is the opposite of a FixedThreadPool, where the blocking queue has an infinite capacity, whereas a CachedThreadPool has an infinite number of threads that can be expanded indefinitely. So the CachedThreadPool thread pool does not require a task queue to store tasks, because once tasks are submitted, they are forwarded directly to the thread or new threads are created to execute them without saving them separately.

  • DelayedWorkQueue       

The third kind of blocking queue is DelayedWorkQueue, its corresponding thread pool is ScheduledThreadPool and SingleThreadScheduledExecutor respectively, the biggest characteristic of these two kinds of thread pool is can delay the task, Such as performing tasks after a certain period of time or at regular intervals. DelayedWorkQueue is characterized by the fact that the internal elements are not sorted by the time they were put in, but by the amount of time they were delayed, using a “heap” data structure. Thread pool ScheduledThreadPool and SingleThreadScheduledExecutor DelayedWorkQueue choice, because they are based on time to perform a task, and delay queue just can take the task is sorted according to time, Facilitate the execution of tasks.

3. Common centralized thread pools

There are several common thread pools in Java, and let’s look at a few important parameters and the thread pool structure.

3.1. FixedThreadPool

The maximum thread is equal to the core thread and is regarded as a thread pool with a fixed number of threads. The number of threads increases from zero. When the maximum number of threads is reached, the number of threads does not increase, and the subsequent task submission is placed directly in the LinkedBlockingDeque, which acts as an unbounded queue.

3.2 CachedThreadPool

The cache thread pool, which is characterized by a core line city of 0, a maximum number of threads of integer.max_value, an infinite number of threads that can be added and reclaimed when idle. The task is first referred to the SynchronousQueue, which does not perform storage but rather serves as a conduit for submitting threads directly to the thread pool for execution.

3.3, ScheduledThreadPool

A thread pool with timers, characterized by a delay queue. Support for simple timed task execution while using thread pools. Mainly from the following three methods ScheduledExecutorService service = Executors. NewScheduledThreadPool (10);

  • service.schedule(new Task(), 10, TimeUnit.SECONDS);

The preceding command indicates that the task will be executed 10 seconds later

  • service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);

ScheduleAtFixedRate indicates that tasks are executed at a fixed frequency. The second parameter initialDelay indicates the time of the first delay, and the third parameter Period indicates the period after the first delay. The preceding command indicates that the first task is executed every 10 seconds

  • service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);

ScheduleWithFixedDelay is similar to the second method, which also executes tasks periodically. The difference lies in the definition of the period. The previous scheduleAtFixedRate starts with the start time of a task, and the second task is executed when the time is up. Regardless of how long the task takes to execute; The scheduleWithFixedDelay method starts with the end of the task as the start of the next cycle. It indicates that the first task is executed 10 seconds after the task is completed, and the next task is executed 10 seconds later

  • SingleThreadScheduledExecutor

SingleThreadScheduledExecutor and ScheduledThreadPool similar, but their only one thread.

  • ForkJoinPool

The main use of thread pools is the same as before, which is to deliver tasks to a thread pool for execution. Thread pools also have task queues to hold tasks. But there are two very big differences between the ForkJoinPool thread pool and its predecessors. The first is that it’s great for tasks that can generate subtasks.

class Fibonacci extends RecursiveTask<Integer> { int n; public Fibonacci(int n) { this.n = n; } @Override public Integer compute() { if (n <= 1) { return n; } Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); f2.fork(); return f1.join() + f2.join(); }}Copy the code
public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); for (int i = 0; i < 10; i++) { ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i)); System.out.println(task.get()); }}Copy the code

4. Why not create thread pools automatically instead of using ThreadPoolExecutor directly?

  • FixedThreadPoo
public static ExecutorService newFixedThreadPool(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
Copy the code

By passing arguments to the constructor, we create a thread pool with the same number of core threads as the maximum number of threads. Their number is the parameter we pass in. The important thing here is that the queue we use is an unlimited LinkedBlockingQueue. More and more tasks will accumulate in the queue, and eventually a large number of tasks will occupy a large amount of memory, and OOM (OutOfMemoryError) will occur, which will almost affect the entire program, resulting in serious consequences.

  • SingleThreadExecutor

NewSingleThreadExecutor and newFixedThreadPool work the same way, except that the core thread count and the maximum thread count are set directly to 1, but the task queue is still unbounded LinkedBlockingQueue. This causes the same problem, which can take up a lot of memory and result in OOM when tasks pile up.

  • CachedThreadPool
public static ExecutorService newCachedThreadPool() { 
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
Copy the code

CachedThreadPool differs from the previous two thread pools in that the task queue uses SynchronousQueue. SynchronousQueue does not store tasks, but forwards them directly, which is fine by itself. However, you will notice that the constructor’s second parameter is set to integer. MAX_VALUE, which means the maximum number of threads. Since CachedThreadPool does not limit the number of threads, it is possible to create too many threads when the number of tasks is too large. Eventually, the operating system’s limit is exceeded and new threads cannot be created, or memory runs out.

  • ScheduledThreadPool and SingleThreadScheduledExecutor
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
Copy the code

5, in the use of multithreading, how to set reasonable parameters?

  • Core threads

First of all, we understand the basis of core threads. CPU intensive tasks, such as encryption, decryption, compression, computing and a series of tasks that require a lot of CPU resources. Optimum number of threads for such tasks as the number of CPU core 1 ~ 2 times, if set too many threads, actually does not have very good effect, because computing tasks are very heavy, takes up a lot of CPU resources, so the CPU each core are basic work at full capacity, and we has too many threads, Each thread wants to use CPU resources to perform its own task, which leads to unnecessary context switches, where the number of threads increases rather than improves performance. In this case, it is also a good idea to consider what other programs are running on the same machine that may use too much CPU, and then balance the resource usage as a whole. Time-consuming I/O tasks The second type of tasks are time-consuming I/O tasks, such as database and file read and write tasks, and network communication tasks. These tasks do not consume CPU resources, but TIME-CONSUMING I/O operations take up a lot of time. For this type of task, the maximum number of threads is usually many times larger than the CPU core, because the IO read and write speed is relatively slow compared to the CPU speed. If we set the number of threads too small, it may lead to a waste of CPU resources. And more if we set the number of threads, so when the part of the thread is waiting for IO, they don’t need to calculate the CPU at this time, then other threads can use the CPU to perform other tasks, each other, so the task queue waiting for the task would be reduced, you can better use of resources.

  • Blocking queue
  • We typically use blocking queues when creating thread pools manually

ArrayBlockingQueue, which is an array implementation that cannot be expanded when you create an object. Compared with the unbounded queue, the blocking queue can ensure the memory occupied by the queue.

  • Thread factory

For the threadFactory threadFactory parameter, we can either use the default defaultThreadFactory, or we can pass in a custom threadFactory with additional capabilities, because we may have multiple thread pools, and it is necessary to distinguish the different thread pools by different names. So you can pass in thread factories that can be named according to the business information, so that you can later distinguish between different businesses based on the thread name and quickly locate the problem code. Such as through com.google.com mon. Util. Concurrent. ThreadFactory

ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
ThreadFactory rpcFactory = builder.setNameFormat("rpc-pool-%d").build();

Copy the code
  • Rejection policies

The last parameter is the reject policy. We can use one of the four reject policies described in Lecture 11: AbortPolicy, DiscardPolicy, DiscardOldestPolicy, or CallerRunsPolicy based on business requirements. The RejectedExecutionHandler interface can also be used to implement the rejectedExecution method. Perform user-defined denial policies, such as printing logs, temporary tasks, and re-execution, to meet service requirements. This is shown in the code.

private static class CustomRejectionHandler implements RejectedExecutionHandler { @Override public void RejectedExecution (Runnable r, ThreadPoolExecutor executor) {// Print rejection policies such as logs, temporary tasks, and re-execution}}Copy the code