One piece of advice in the Alibaba handbook:

【 Mandatory 】 Thread pools cannot be created by Executors. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion.

If you often create thread pools based on the factory method provided by Executors, it’s easy to ignore the implementation inside the thread pool. In particular, the deny policy is ignored because the default value is used when the thread pool is created by using Executors.

Let’s take a look at thread pool-related implementation principles, apis, and examples.

The role of thread pools

Thread pools are created in practice to:

  • Reduce resource overhead: Reduce the overhead of creating and destroying threads each time;
  • Improve response speed: When the request comes, the thread has been created and can be directly executed, improving the response speed;
  • Improve the manageability of threads: Threads are scarce resources and need to be limited according to the situation to ensure stable system operation.

ThreadPoolExecutor

ThreadPoolExecutor enables the creation of thread pools. The ThreadPoolExecutor class is shown below:

As you can see from the class diagram, ThreadPoolExecutor ultimately implements the Executor interface and is the true implementer of thread pool creation.

Executor two-level scheduling model

In the HotSpot VIRTUAL machine, threads in Java are mapped to operating system threads one by one. At the Java Virtual machine level, users submit multiple tasks to the Executor framework, which assigns threads to execute them. At the operating system level, the operating system assigns these threads to the processor for execution.

Three roles for ThreadPoolExecutor

task

ThreadPoolExecutor accepts two types of tasks: Callable and Runnable.

  • Callable: This type of task returns a result and can throw an exception. Submit via the Submit method and return the Future object. Obtain the execution result through GET.
  • Runnable: This type of task is executed only and cannot obtain the returned result. Exceptions cannot be thrown during the execution. Submit through the execute or Submit method.

Task executor

The core interface of the Executor framework is Executor, which represents the Executor of a task.

As you can see from the class diagram above, Executor’s subinterface is ExecutorService. Go back to the underlying has two big implementation class: ThreadPoolExecutor and ScheduledThreadPoolExecutor (integration from ThreadPoolExecutor).

The execution result

The Future interface represents asynchronous execution results and has an implementation class called FutureTask.

The processing logic diagram between the three roles is as follows:

Thread pool processing flow

A thread goes through the following process from submission to execution:

  • The thread pool determines whether all the threads in the core thread pool are executing a task, and if not, creates a new worker thread to execute the task. If all the threads in the core thread pool are executing tasks, the next process is entered.
  • The thread pool determines whether the work queue is full. If the work queue is not full, the newly submitted tasks are stored in the work queue. If the work queue is full, the next process is entered;
  • A thread pool determines whether its internal threads are all working. If not, a new worker thread is created to execute the task. If it is full, the saturation policy is handed over to handle the task.

The thread pool executes the execute method in the following situations:

  • If fewer threads are currently running than corePoolSize, a new thread is created to perform the task (requiring a global lock);
  • If the running thread is equal to or more than corePoolSize, add the task to BlockingQueue;
  • If the task cannot be added to the BlockingQueue(the queue is full), a new thread is created to process the task (requiring a global lock);
  • If you create a new thread will make the currently running thread beyond maxiumPoolSize, task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method.

Thread pools are designed with the above flow to reduce the number of global locks acquired. After the thread pool has been warmed up (the number of threads currently running is greater than or equal to corePoolSize), almost all excute method calls perform Step two.

The state flow of a thread

To review Thread state transitions, the JDK provides an enumeration class for Thread states:

    public enum State {
​
        NEW,
​
        RUNNABLE,
​
        BLOCKED,
​
        WAITING,
​
        TIMED_WAITING,
​
        TERMINATED;
    }
Copy the code

There are six enumerated values that represent five types of thread state:

  • NEW: NEW.
  • RUNNABLE: indicates the running status.
  • BLOCKED:
  • WAITING: a WAITING state, WAITING and TIMED_WAITING are both WAITING states, except that the latter state can set the WAITING time.
  • TERMINATED: TERMINATED state;

Thread relationship transition diagram:

When new Thread() indicates that the Thread is in the new state; Calling thread.start () indicates that the Thread is RUNNABLE;

But the RUNNABLE state includes two states: READY and RUNNING. Call start() and the thread will be READY, waiting for the CPU slice, or RUNNING when it gets the CPU slice.

When a block of synchronized code is invoked during the run, it is BLOCKED when the lock is not acquired. When the lock is acquired again, it changes to RUNNING state. In the process of code execution, there may be some wait methods such as object.wait (), and the thread state changes to WAITING, WAITING to be waked up. When object.notifyall () is invoked to wake up, the thread state changes to TERMINATED.

Status of the thread pool

The state in the thread pool is TERMINATED by two bits representing the five states of the pool: RUNNING, SHUTDOWN, STOP, TIDYING, and TERMINATED:

  • RUNNING: The normal working state of the thread pool. In the RUNNING state, the thread pool receives new tasks and processes tasks in the task queue.
  • SHUTDOWN: callshutdown()The method will enterSHUTDOWNState. inSHUTDOWNIn state, the thread pool does not accept new tasks, but continues to execute existing tasks in the task queue.
  • STOP: callshutdownNow()Will enterSTOPState. inSTOPThe thread pool does not accept new tasks, nor does it process tasks that are already in the queue. For a worker thread that is still executing a task, the thread pool will initiate an interrupt request to interrupt the executing task and empty the task queue that has not been executed.
  • TIDYING: Enters the TIDYING state when all worker threads executing tasks in the thread pool have terminated and the set of worker threads is empty.
  • TERMINATED: When the thread pool is TERMINATEDterminated()After the hook method, the thread pool enters its final stateTERMINATED;

ThreadPoolExecutor API

ThreadPoolExecutor creates a thread pool API:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
Copy the code

Parameter Description:

  • CorePoolSize: Number of resident core threads in the thread pool. When you create a thread pool, there are no threads in the pool. You create a thread when a task comes in and execute it. Submit a task and create a thread until the number of tasks to be executed is greater than the basic size of the thread pool. When the number of threads created equals corePoolSize, they are added to the set blocking queue.

  • MaximumPoolSize: The maximum number of threads allowed to be created in the thread pool. When the queue is full, threads are created to execute the task until the number in the thread pool equals maximumPoolSize.

  • KeepAliveTime: When the number of threads is greater than the core, this is the maximum time for the extra idle threads to wait for a new task before terminating.

  • Unit: keepAliveTime unit. Optional: DAYS, HOURS, MINUTES, MILLISECONDS, MICROSECONDS and NANOSECONDS.

  • WorkQueue: Stores the queue for tasks to be executed.

  • ThreadFactory: a threadFactory that produces a set of threads for the same task. It is used to set the prefix of the generated thread, whether it is a daemon thread, and the priority. Setting meaningful name prefixes will help you know which thread factory created the thread when performing virtual machine analysis.

  • Handler: Executes the reject policy object. When the task cache limit is reached (that is, the number of tasks that can be stored by the workQueue parameter), the reject policy is executed. That is, when the task cannot be processed, the thread pool starts to implement the rejection policy. JDK 1.5 provides four saturation strategies:

    • AbortPolicy: the default value is directly throw an exception.
    • Just execute the task on the caller’s thread, retry adding the current task, and it automatically repeats the execute() method;
    • DiscardOldestPolicy: Discards the longest task in the task queue.
    • DiscardPolicy: Discards the current task.

Appropriate blocking queues

When the number of threads created equals corePoolSize, the task is added to the BlockingQueue, which maintains Runnable objects waiting to be executed.

Blocking queues usually have the following types:

  • ArrayBlockingQueue: A bounded blocking queue composed of array structures. If the queue is full, a new thread (non-core thread) will execute the task. If the queue is full, a new thread (non-core thread) will execute the task. If the total number of threads reaches maximumPoolSize, a new thread will execute the task. And the queue is full, an error occurs.
  • LinkedBlockingQueue: A bounded blocking queue consisting of a linked list structure. When this queue receives a task, if the number of current threads is smaller than the number of core threads, it creates a new thread (core thread) to process the task. If the number of current threads is equal to the number of core threads, the queue is queued. Since there is no maximum limit on this queue, any task that exceeds the number of core threads will be added to the queue, which invalidates maximumPoolSize because the total number of threads will never exceed corePoolSize.
  • PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting.
  • DelayQueue: an unbounded blocking queue implemented using a priority queue. The elements in the queue must implement the Delayed interface, which means that the incoming task must implement the Delayed interface first. When the queue receives a task, it joins the queue first. The task will be executed only after the specified delay time is reached.
  • SynchronousQueue: A blocking queue that does not store elements. When the queue receives a task, it submits it directly to the thread instead of holding it, creating a new thread to handle the task if all threads are working. Therefore, maximumPoolSize is usually specified as integer. MAX_VALUE, or infinite, when using this type of queue to ensure that no new thread can be created when the number of threads reaches maximumPoolSize.
  • LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure.
  • LinkedBlockingDeque: A bidirectional blocking queue consisting of a linked list structure.

Clear rejection strategies

When the task fails to process, the thread pool begins to enforce a rejection policy.

Supported rejection policies:

  • ThreadPoolExecutor. AbortPolicy: discard task and throw RejectedExecutionException anomalies. (the default)
  • ThreadPoolExecutor. DiscardPolicy: discard task too, but I don’t throw an exception.
  • ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then try again. (Repeat the process)
  • ThreadPoolExecutor. CallerRunsPolicy: handle the tasks by the calling thread.

Thread pool shutdown

  • shutdown: Sets the thread pool state to SHUTDOWN and does not stop immediately. Stop receiving external submit tasks, internal running tasks and tasks waiting in the queue, will be executed, really stop.
  • shutdownNow: Sets the thread pool state to STOP. Interrupt (if the thread is not in sleep, wait, condition, or timed lock state), interrupt (if the thread is not in sleep, wait, condition, or timed lock state). Interrupt does not interrupt the current thread), returns a list of unfinished tasks.
  • awaitTermination(long timeOut, TimeUnit unit)The current threadblockingThrows InterruptedException until all committed tasks (both running and queued) have finished executing or until timeout expires or the thread is interrupted, and returns true (all tasks have finished following the shutdown request) or false (timed out).

Executors

Executors is a help class that provides methods for creating several pre-configured thread pool instances: newSingleThreadExecutor, newFixedThreadPool, newCachedThreadPool, and so on.

Executors essentially implement the default Classes of ThreadPoolExecutor. The Alibaba development manual does not recommend using Executors by default, allowing users to create directly from ThreadPoolExecutor.

Executors.newSingleThreadExecutor()

Create a single thread pool. This thread pool has only one thread working, which is equivalent to a single thread executing all tasks in serial. If the unique thread terminates due to an exception, a new thread will replace it. This thread pool ensures that all tasks are executed in the order in which they were submitted.

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
Copy the code

The structure of this type of thread pool:

The characteristics of this thread pool:

  • Only one worker thread is created to process the task;
  • The blocking queue used is LinkedBlockingQueue;

Executors.newFixedThreadPool()

Create a thread pool of fixed size. A thread is created each time a task is submitted until the thread reaches the maximum size of the thread pool. The size of the thread pool stays the same once it reaches its maximum size, and if a thread terminates due to execution exceptions, a new thread is added to the pool.

new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
Copy the code

The structure of this type of thread pool:

The characteristics of this thread pool:

  • Fixed size;
  • CorePoolSize and maximunPoolSize both specify the number of threads specified by the user.
  • KeepAliveTime is 0, which means that once there are extra idle threads, they are stopped immediately. But keepAliveTime is invalid;
  • The blocking queue uses LinkedBlockingQueue, an unbounded queue;
  • Because a blocking queue is an unbounded queue, it is never possible to reject a task;
  • Because of the unbounded queue, the actual number of threads will always remain at nThreads, so maximumPoolSize and keepAliveTime will not be valid.

Executors.newCachedThreadPool()

Create a cacheable thread pool. If the size of the thread pool exceeds the number of threads needed to process the task, the pool can reclaim some of the idle (60-second non-executing) threads and intelligently add new threads to process the task as the number of tasks increases. This thread pool has no limit on the thread pool size, which is entirely dependent on the maximum thread size that the operating system (or JVM) can create.

new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
Copy the code

The structure of this type of thread pool:

The characteristics of this thread pool:

  • It can expand infinitely;
  • It is suitable for processing tasks with relatively small execution time;
  • CorePoolSize is 0 and maximumPoolSize is infinite, meaning the number of threads can be infinite.
  • KeepAliveTime is 60 seconds, meaning that the thread will be killed if the idle time exceeds 60 seconds.
  • SynchronousQueue holds waiting tasks. This blocking queue has no storage space, meaning that whenever a request arrives, a worker thread must be found to process it, and a new thread must be created if there are no threads currently idle.

Executors.newScheduledThreadPool(a)

Create a fixed – length thread pool to support scheduled and periodic task execution.

new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
Copy the code

The thread pool class diagram:

The characteristics of this thread pool:

  • ScheduledAtFixedRate and scheduledWithFixedDelay can be used to submit tasks when receiving SchduledFutureTask tasks. SchduledFutureTask receives the following parameters:

    • Time: indicates the start time of a task
    • SequenceNumber: indicates the sequenceNumber of a task
    • Period: period of task execution
  • Use DelayQueue to store waiting tasks.

  • DelayQueue contains a PriorityQueue, which is sorted by time. If time is the same, the PriorityQueue is sorted by sequenceNumber.

  • DelayQueue is also an unbounded queue;

  • When the worker thread executes, the worker thread will fetch expired tasks from the DelayQueue to execute them. After the task is executed, reset the expiration time of the task and restore the DelayQueue.

Executors.newWorkStealingPool()

JDK8 was introduced to create a thread pool that holds enough threads to support a given degree of parallelism and reduce contention by using multiple queues.

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
        null, true);
}
Copy the code

Executors method disadvantages

1) newFixedThreadPool and newSingleThreadExecutor: The allowed request queue length is integer. MAX_VALUE, which may accumulate a large number of requests, resulting in OOM. 2) newCachedThreadPool and newScheduledThreadPool: the number of threads allowed to be created is integer. MAX_VALUE, which may create a large number of threads, resulting in OOM.

Configure the thread pool size properly

To properly configure a thread pool, you need to analyze the task characteristics from the following perspectives:

  • Nature of tasks: CPU intensive tasks, IO intensive tasks and hybrid tasks.
  • Task priority: high, medium and low.
  • Task execution time: long, medium and short.
  • Task dependencies: Whether they depend on other system resources, such as database connections.

In addition, you also need to check the number of system cores:

Runtime.getRuntime().availableProcessors());
Copy the code

CPU and I/O resources required by a task can be classified into the following types:

  • CPU intensive tasks: Mainly perform computing tasks with fast response times and the CPU is always running. General formula: Number of threads = number of CPU cores + 1. The speed can only be achieved on true multi-core cpus. The advantage is that there is no thread switching overhead, improving CPU utilization and reducing thread switching efficiency loss.
  • IO intensive tasks: Perform I/O operations. The CPU does not always perform I/O operations and the I/O operations (in the IDLE STATE of the CPU) take a long time. Therefore, configure as many threads as possible to perform I/O operations. General formula: Number of threads = number of CPU cores x 2.

Using the instance

Task implementation class:

* @author SEC * @version 1.0 * @date 2021/10/30 **/ public class MyThread implements Runnable{private final Integer number; public MyThread(int number){ this.number = number; } public Integer getNumber() { return number; } @override public void run() {try {timeunit.seconds.sleep (1); System.out.println("Hello! ThreadPoolExecutor - " + getNumber()); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code

Custom blocking commit ThreadLocalExcutor:

ThreadPoolExecutor * @author SEC * @version 1.0 * @date 2021/10/30 **/ public class CustomBlockThreadPoolExecutor { private ThreadPoolExecutor pool = null; Public void init() {// poolSize int poolSize = 2; Int maxPoolSize = 4; TimeUnit Long keepAliveTime = 30L; // ArrayBlockingQueue<Runnable> queue size 30 int arrayBlockingQueueSize = 30; pool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<>(arrayBlockingQueueSize), new CustomThreadFactory(), new CustomRejectedExecutionHandler()); } /** * public void destroy() {if (pool! = null) { pool.shutdownNow(); } } public ExecutorService getCustomThreadPoolExecutor() { return this.pool; } /** * custom thread factory class, */ Private static class CustomThreadFactory implements ThreadFactory {private final AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); String threadName = CustomBlockThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1); t.setName(threadName); return t; }} / custom refused to policy object * * * * / private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { Block submit try {executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace(); ** * When the submitted task is rejected, enter the rejection mechanism, implement the rejection method, put the task with the blocking submission method, implement the blocking submission task function, prevent the queue is too large, OOM */ public static void main(String[] args) { CustomBlockThreadPoolExecutor executor = new CustomBlockThreadPoolExecutor(); // Initialize executor.init(); ExecutorService pool = executor.getCustomThreadPoolExecutor(); for (int i = 1; i < 51; i++) { MyThread myThread = new MyThread(i); System.out.println(" commit "+ I + "); pool.execute(myThread); } pool.shutdown(); Try {// block, timeout expires or thread is interrupted if (! Pool.awaittermination (60, timeUnit.seconds)) {// Disable executor.destroy() immediately; } } catch (InterruptedException e) { executor.destroy(); }}}Copy the code

summary

Seemingly simple thread pool creation, which contains all kinds of knowledge, integration, according to the specific scene using specific parameters to set can achieve the optimal effect.

To sum up:

  • Customize the thread pool with ThreadPoolExecutor, depending on the purpose of the thread. If the number of tasks is small, use the unbounded queue. If the number of tasks is large, use the bounded queue to avoid OOM.
  • If the number of tasks is large and each task is required to be processed successfully, the submitted task is blocked and the rejection mechanism is overridden to block the submitted task. Make sure not to abandon a task;
  • The maximum number of threads is generally set to 2N+1, where N is the number of CPU cores.
  • The number of core threads depends on whether the task is CPU intensive or IO intensive. Also, if the task is to run once a day, set it to 0 because it stops at the end.
  • If you want to obtain the execution result of the task, use CompletionService, but note that the result of the task needs to be obtained by a new thread. If the result is obtained in the main thread, it will be obtained after all the tasks are submitted, which will block a large number of task results and the queue is too large for OOM, so it is better to start a thread asynchronously to obtain the result.

About the blogger: Author of the technology book SpringBoot Inside Technology, loves to delve into technology and writes technical articles.

Public account: “program new vision”, the blogger’s public account, welcome to follow ~

Technical exchange: Please contact the weibo user at Zhuan2quan

Reference article:

[1] www.jianshu.com/p/94852bd1a…

[2] blog.csdn.net/jek123456/a…

[3] blog.csdn.net/z_s_z2016/a…

[4]zhuanlan.zhihu.com/p/33264000

[5] www.cnblogs.com/semi-sub/p/…

\