When using a particular technology, I think we can think about a few questions:

(1) What is this technology?

(2) What problem is this technology designed to solve?

(3) The comparison between this technology and the same type, why do we choose it?

The introduction

What is a thread pool

Thread Pool is a tool to manage threads based on pooling idea, which often appears in multi-threaded server, such as MySQL.

Too many lines will bring extra costs, including the cost of creating and destroying threads, the cost of scheduling threads, etc., and also reduce the overall performance of the computer.

A thread pool maintains multiple threads waiting for a supervisor to assign tasks that can be executed concurrently.

This approach, on the one hand, avoids the cost of creating and destroying threads while processing tasks, on the other hand, avoids the excessive scheduling problem caused by the expansion of the number of threads, and ensures the full utilization of the kernel.

Thread pools are an implementation of the pooling idea. The implementation in Java is TreadPoolExecutor.

Of course, there are a number of benefits to using thread pools:

  • Reduced resource consumption: Reuse of created threads through pooling techniques to reduce wastage from thread creation and destruction.
  • Improved response time: Tasks can be executed immediately when they arrive without waiting for threads to be created.
  • Improve manageability of threads: Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also cause resource scheduling imbalance due to unreasonable distribution of threads, which reduces system stability. Thread pools allow for uniform allocation, tuning, and monitoring.
  • Provide more and more powerful featuresThread pools are extensible, allowing developers to add more functionality to them. Such as a delayed timed thread poolScheduledThreadPoolExecutorAllows the task to be deferred or executed periodically.

What problem does a thread pool solve

The core problem solved by thread pools is resource management.

In a concurrent environment, the system cannot determine how many tasks need to be executed or how many resources need to be invested at any given time. This uncertainty raises several questions:

  1. The additional cost of applying/destroying resources and scheduling resources frequently can be significant.
  2. The lack of means to suppress unlimited resource applications may lead to the risk of system resource exhaustion.
  3. The system cannot properly manage internal resource distribution, which reduces system stability.

To solve the problem of resource allocation, thread Pooling adopts the idea of Pooling. Pooling, as the name suggests, is the idea of managing resources together in order to maximize returns and minimize risks.

Typical usage strategies include:

  1. Memory pool (Memory Pooling) : Applies for memory in advance to improve memory application speed and reduce memory fragmentation.
  2. The connection pool (Connection Pooling) : Apply for database connections in advance to improve the speed of applying for connections and reduce system overhead. For example, alibaba’s database connection pool:Druid.
  3. Instance pool (Object Pooling) : Recycle objects to reduce expensive consumption of resources during initialization and release.SpringIOCThat’s the same idea.

Why did we choose it

All things considered, ThreadPoolExecutor is the choice.

function

The dynamic tuning parameter

The ThreadPoolExecutor thread pool class provides a set method to set the number of core threads, the maximum number of threads, and the lifetime of the thread pool.

We need to know how adjustments are made within the thread pool so that problems can be quickly identified and resolved.

Real-time monitoring

The ThreadPoolExecutor thread pool class provides methods to read the running status and parameters of the current thread pool.

Mission control

The custom thread pool class inherits the ThreadPoolExecutor class, overriding its beforeExecute() and afterExecute() methods and writing the appropriate code logic in the corresponding methods to log the health of thread pool tasks.

Load the alarm

(1) Before, count the activity of the thread pool in the beforeExecutor() method. If it exceeds a certain threshold, it alarms and sends a message to the corresponding developer.

(2) after the event, in afterExecute () method of the length of the statistics in the queue waiting for tasks, if above a certain threshold for alarm, and by capturing RejectedExecutionException abnormalities, if captured, made the alarm, send a message to the appropriate developer.

Code implementation

Thread pool configuration class:

/** * Thread pool configuration *@ClassName TreadPoolConfig
 * @Author YH
 * @Date 2021/12/4
 * @Version1.0 * /
@Configuration
public class TreadPoolConfig {

    /** * Number of thread pool core threads */
    private int coreSize = 2 * Runtime.getRuntime().availableProcessors();

    /** * Maximum number of threads in the pool */
    private int maxPoolSize = 25 * Runtime.getRuntime().availableProcessors();

    /** * Maximum queue length */
    private int queueCapacity = 100;

    /** * The thread pool maintains the free time allowed by threads */
    private int keepAliveSeconds = 300;


    /** * Performs periodic or scheduled tasks */
    @Bean(name = "scheduledExecutorService")
    protected ScheduledExecutorService scheduledExecutorService(a) {
        / * * * ScheduledThreadPoolExecutor inherited from ThreadPoolExecutor. * It is mainly used to run tasks after a given delay, or to perform tasks periodically. ScheduledThreadPoolExecutor * function similar to the Timer, but ScheduledThreadPoolExecutor function more powerful, more flexible. Timer is the corresponding individual background threads, * and ScheduledThreadPoolExecutor can be specified in the constructor more corresponding background thread count. * new BasicThreadFactory. Builder (). NamingPattern (" the schedule - the pool - % d ") * take a name to the thread pool works * /
        return new ScheduledThreadPoolExecutor(coreSize,
                new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d")
                    .daemon(true).build());
    }

    /** * Custom thread pool *@return* /
    @Bean(name = "threadPoolExecutor")
    protected ThreadPoolExecutor threadPoolExecutor(a) {
        return new MyThreadPoolExecutor(
                this.coreSize,
                this.maxPoolSize,
                this.keepAliveSeconds,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(this.queueCapacity),
                Executors.defaultThreadFactory(),
                newThreadPoolExecutor.AbortPolicy()); }}Copy the code

The dynamic tuning parameter

Interface design:

/** * Thread pool parameters dynamic * modify thread pool parameters *@paramThreadPoolUpdateVO modifies the thread pool parameter view class *@return* /
@apiOperation (" Modify thread pool parameters ")
@PutMapping("/thread_pool")
public Result updateThreadPool(
        @apiParam (" Thread pool name ") @Validated
        @RequestBody ThreadPoolUpdateVO threadPoolUpdateVO) {

    ThreadPoolExecutor threadPoolExecutor = null;
    try {
        // Get the thread pool instance from the Spring container by name
        threadPoolExecutor = SpringUtil.getBean(threadPoolUpdateVO.getThreadPoolName());
    } catch (NoSuchBeanDefinitionException e) {
        return Result.failure().message("Cannot find corresponding thread pool instance by thread pool name");
    }

    // Change the core thread count of the thread pool
    threadPoolExecutor.setCorePoolSize(threadPoolUpdateVO.getCorePoolSize());
    // Change the maximum number of threads in the thread pool
    threadPoolExecutor.setMaximumPoolSize(threadPoolUpdateVO.getMaximumPoolSize());
    // Change the thread pool lifetime and unit
    switch (threadPoolUpdateVO.getUnit()) {
        case 0: {
            threadPoolExecutor.setKeepAliveTime(
                    threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.NANOSECONDS);
        }
        break;
        case 1: {
            threadPoolExecutor.setKeepAliveTime(
                    threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.MICROSECONDS);
        }
        case 2: {
            threadPoolExecutor.setKeepAliveTime(
                    threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.MILLISECONDS);
        }
        case 3: {
            threadPoolExecutor.setKeepAliveTime(
                    threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.SECONDS);
        }
        case 4: {
            threadPoolExecutor.setKeepAliveTime(
                    threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.MINUTES);
        }
        case 5: {
            threadPoolExecutor.setKeepAliveTime(
                    threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.HOURS);
        }
        case 6: { threadPoolExecutor.setKeepAliveTime( threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.DAYS); }}return Result.success().message("Thread pool parameters modified successfully");
}
Copy the code

Thread pool parameters modify view class:

/** * Thread pool parameters modify view class *@ClassName ThreadPoolUpdateVO
 * @Author YH
 * @Date 2021/12/14
 * @Version1.0 * /
@Data
public class ThreadPoolUpdateVO {

    private static final long serialVersionUID = 1L;

    /** * Thread pool name */
    @notblank (groups = {}, message = "Thread pool name cannot be empty ")
    private String threadPoolName;

    /** * Number of core threads */
    private Integer corePoolSize;

    /** * Maximum number of threads */
    private Integer maximumPoolSize;

    /**
     * 存活时间
     */
    private Long keepAliveTime;

    /** * The survival time unit converted to the corresponding enumeration type according to the corresponding index * 0 NANOSECONDS; 1 MICROSECONDS; 2 MILLISECONDS. * 3 SECONDS; 4 MINUTES; 5 HOURS; 6 DAYS */
    private Integer unit;

}
Copy the code

Real-time monitoring

Interface design:

/** * get thread pool state *@paramThreadName Name of the thread pool *@return* /
@apiOperation (" Get thread pool state ")
@GetMapping("/thread_pool")
public Result getThreadPoolStatus(
        @apiParam (" Thread pool name ")
        @RequestParam("threadPoolName") String threadName
) {
    ThreadPoolExecutor threadPoolExecutor = null;

    try {
        threadPoolExecutor = SpringUtil.getBean(threadName);
    } catch (NoSuchBeanDefinitionException e) {
        return Result.failure().message("Cannot find corresponding thread pool instance by thread pool name")
                .errorMessage("Cannot find corresponding thread pool instance by thread pool name");
    }

    // Number of core threads in the thread pool
    int corePoolSize = threadPoolExecutor.getCorePoolSize();
    // The maximum number of threads allowed by the thread pool
    int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
    // The number of active threads in the pool
    int activeCount = threadPoolExecutor.getActiveCount();
    // The thread pool rejection policy
    RejectedExecutionHandler handler = threadPoolExecutor.getRejectedExecutionHandler();
    String handlerName = handler.getClass().getSimpleName();
    // The number of current threads in the thread pool
    int poolSize = threadPoolExecutor.getPoolSize();
    // The queue type of the thread pool
    BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
    String queueName = queue.getClass().getSimpleName();
    // Total number of tasks executed by the thread pool
    long taskCount = threadPoolExecutor.getTaskCount();
    // The thread factory of the thread pool
    ThreadFactory threadFactory = threadPoolExecutor.getThreadFactory();
    String threadFactoryName = threadFactory.getClass().getSimpleName();
    // The maximum number of threads in the pool. This parameter can be used to determine whether the pool is full at a certain point
    int largestPoolSize = threadPoolExecutor.getLargestPoolSize();
    // The total number of tasks completed by the thread pool
    long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
    // The lifetime of the thread in the thread pool, in seconds
    long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
    // Whether the thread pool is closed
    boolean shutdown = threadPoolExecutor.isShutdown();
    // Whether the thread pool terminates
    boolean terminated = threadPoolExecutor.isTerminated();
    // Whether the thread pool is finished
    boolean terminating = threadPoolExecutor.isTerminating();

    Map<String, Object> map = new HashMap<>();
    map.put("corePoolSize", corePoolSize);
    map.put("maximumPoolSize", maximumPoolSize);
    map.put("activeCount", activeCount);
    map.put("poolSize", poolSize);
    map.put("handler", handlerName);
    map.put("queue", queueName);
    map.put("taskCount", taskCount);
    map.put("threadFactory", threadFactoryName);
    map.put("largestPoolSize", largestPoolSize);
    map.put("completedTaskCount", completedTaskCount);
    map.put("keepAliveTime", keepAliveTime);
    map.put("shutdown", shutdown);
    map.put("terminated", terminated);
    map.put("terminating", terminating);

    return Result.success(map).message("Thread pool state obtained successfully");
}
Copy the code

Mission control

Custom thread pool:

/** * The base thread pool class extends functionality *@ClassName MyThreadPoolExecutor
 * @Author YH
 * @Date 2021/12/15
 * @Version1.0 * /
@Slf4j
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    /** * The start time of the task */
    private ConcurrentHashMap<String, Date> startTimes;

    /** * Thread pool name * added to */ via constructor
    private String threadPoolName;

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        The super method must be placed on the first line of code
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.startTimes = new ConcurrentHashMap<>();
    }

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.startTimes = new ConcurrentHashMap<>();
    }

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.startTimes = new ConcurrentHashMap<>();
    }

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.startTimes = new ConcurrentHashMap<>();
    }

    /** * The method to execute before the task is executed * Record the start time *@param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
        double activity = (this.getActiveCount() * 1.0) / this.getMaximumPoolSize() * 100;
        // If thread pool activity exceeds the specified threshold, send a message to the development lead
        if (activity > 0.8) {
            // Send an email to the responsible person
        }
        log.info("Thread pool activity (activeCount/maximumPoolSize) :"
                + activity + "%");
        // If the queue length exceeds the specified threshold, a message is sent to the development lead
        log.info("Queue length:" + this.getQueue().size());
    }

    /** * The method to execute after the task is executed * records the end time and calculates the total time from the start time *@param r
     * @param t
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // Task start time
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        // Task end time
        Date endDate = new Date();
        // Task execution duration
        long executionTime = endDate.getTime() - startDate.getTime();


        // Count task time, number of initial threads, number of core threads, number of executing tasks, number of completed tasks, total tasks,
        // The number of tasks cached in the queue, the maximum number of threads in the pool, the maximum number of threads allowed, the idle time of threads, whether the thread pool is closed, whether the thread pool is terminated
        log.info(String.format("Thread pool monitoring:" + this.threadPoolName
                        + "-pool-monitor: Duration: %d ms, PoolSize: %d, CorePoolSize: " +
                        "%d, Active: %d, Completed: %d, Task: %d, Queue: %d, LargestPoolSize: " +
                        "%d, MaximumPoolSize: %d,KeepAliveTime: %d, isShutdown: %s, isTerminated: %s",
                executionTime, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(),
                this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS),
                this.isShutdown(), this.isTerminated())); }}Copy the code

Load the alarm

/** * The method to execute before the task is executed * Record the start time *@param t
 * @param r
 */
@Override
protected void beforeExecute(Thread t, Runnable r) {
    startTimes.put(String.valueOf(r.hashCode()), new Date());
    double activity = (this.getActiveCount() * 1.0) / this.getMaximumPoolSize() * 100;
    // If thread pool activity exceeds the specified threshold, send a message to the development lead
    if (activity > 0.8) {
        // Send an email to the responsible person
    }
    log.info("Thread pool activity (activeCount/maximumPoolSize) :"
            + activity + "%");
    // If the queue length exceeds the specified threshold, a message is sent to the development lead
    log.info("Queue length:" + this.getQueue().size());
}
Copy the code

The principle behind it

Running state

ThreadPoolExecutor can run in five states:

Task scheduling process

The execute() method is the entry. The scheduling process is as follows:

  1. First detect thread pool running status, if notRUNNING, is directly rejected, and the thread pool must be guaranteed inRUNNINGTo execute the task.
  2. ifworkerCount < corePoolSize, a thread is created and started to execute the newly submitted task.
  3. ifworkerCount >= corePoolSize, and the blocking queue in the thread pool is not full, the task is added to the blocking queue.
  4. ifworkerCount >= corePoolSize && workerCount < maximumPoolSize, and the blocking queue in the thread pool is full, a thread is created and started to execute the newly submitted task.
  5. ifworkerCount >= maximumPoolSize, and the blocking queue in the thread pool is full, the task is processed according to the reject policy. The default processing method is to throw an exception directly.

Task buffer

The task buffer module is the core part of the thread pool that can manage tasks.

The essence of thread pool is the management of tasks and threads, and the key idea to achieve this is to decouple the tasks and threads from the direct correlation, so that the subsequent allocation work can be done.

Thread pools are implemented in producer-consumer mode through a blocking queue. The blocking queue caches tasks from which the worker thread retrieves them.

A BlockingQueue is a queue that supports two additional operations. The two additional operations are:

(1) When the queue is empty, the thread that fetched the element will wait for the queue to become non-empty.

(2) When the queue is full, the thread that stores the element waits for the queue to become available.

Blocking queues are often used in producer and consumer scenarios, where the producer is the thread that adds elements to the queue and the consumer is the thread that takes elements from the queue. A blocking queue is a container in which producers hold elements, and consumers only take elements from the container.

Different queues can implement different task access strategies.

Task to apply for

There are two possibilities for task execution: one is that the task is directly executed by the newly created thread. The other is that the thread retrieves the task from the task queue and executes it, and the idle thread that completes the task will apply for the task from the queue again to execute it. The first is only when the thread is initially created, and the second is when the thread acquires most of the tasks.

The thread needs to fetch tasks from the task cache module to help the thread get tasks from the blocking queue, and realize the communication between the thread management module and the task management module. This part of the policy is implemented by the getTask() method, and its execution flow is shown in the figure below:

The getTask() section makes several judgments to control the number of threads to match the state of the thread pool.

Task refuse

The task rejection module is the protected part of the thread pool. The thread pool has a maximum capacity. When the task cache queue of the thread pool is full and the number of threads in the thread pool reaches maximumPoolSize, the task must be rejected and the task rejection policy is adopted to protect the thread pool.

A rejection policy is an interface designed as follows:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

Users can implement this interface to customize rejection policies or choose from the four existing rejection policies provided by the JDK, which have the following features:

Refer to the link

Implementation principle of Java thread pool and its practice in Meituan business

Remember a thought about the use of thread pools triggered by failures

An online thread pool task problem processing process