A preface.

After writing several literacy articles on Java data structures, I felt that they were all too theoretical and not practical. The original intention of writing a blog is to improve personal skills, and at the same time, to better apply the skills to daily study and work. This article introduces you to a thread pool tool class that a blogger likes to use in his daily work. This class is in my combination of many daily business scenarios under the integration of a tool class, out of the box, I hope to help you

If you use this thread pool class in your company’s production environment, please refer to this blog post

Introduction to thread pools

As always, let’s take care of those of you who are new to Java or who are learning thread pools. A systematic introduction to the concept of thread pools.

2.1. What is a thread pool

A thread pool maintains a set of reusable threads and can scale up the reusable threads to a certain extent.

2.2. Why thread pools

Let me ask you what are the common ways to create asynchronous threads? OK, let’s not keep it in suspense

  1. Inherits the Thread class to create a Thread
  2. Implement the Runnable interface to create threads
  3. Create threads using Callable and Future
  4. Using thread pools for example using the Executor framework

We put 123 in one category and 4 in a separate category. 123 The way to create a thread is displayed in the code call to create a one-time use thread, if the corresponding business interface is frequently accessed, then the new thread will be a lot of, but this thread often has a long life cycle, the creation and destruction of the thread will occupy a lot of time. Therefore, the emergence of the thread pool, the same class of tasks need to be executed, put into the thread pool, let the thread pool to reuse thread execution, reduce the number of threads created and destroyed, but also can make full use of multi-core CPU to execute tasks, full performance.

2.3. Thread pools provided by the JDK

1. Fixed thread pool (newFixedThreadPool)

2. Cache thread pool (newCachedThreadPool)

3. Thread pool for a single thread (newSingleThreadExecutor)

4. Fixed number of thread pools (newScheduledThreadPool)

The thread ChiJieShao than this paper, don’t do in detail, I still warm heart with an introduction of links: www.cnblogs.com/frankyou/p/…

2.4. Thread pool parameters

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

1. CorePoolSize – The size of the thread pool core pool

2. MaximumPoolSize – The maximum number of threads in the thread pool.

3. KeepAliveTime – When the number of threads is greater than the core, this is the maximum time for the extra idle threads to wait for a new task before terminating.

4. Unit-keepalivetime Time unit.

5. WorkQueue – Stores queues for tasks to be executed.

Queue type

ArrayBlockingQueue: A bounded blocking queue composed of array structures. LinkedBlockingQueue: A bounded blocking queue consisting of a linked list structure. PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting. DelayQueue: an unbounded blocking queue implemented using a priority queue. SynchronousQueue: A blocking queue that does not store elements. LinkedTransferQueue: An unbounded blocking queue consisting of a linked list structure. LinkedBlockingDeque: A bidirectional blocking queue consisting of a linked list structure.

Queue analytic: www.cnblogs.com/guoyu1/p/13…

6. ThreadFactory – threadFactory [for custom created threads]

7. Handler-reject policy

ThreadPoolExecutor. AbortPolicy: discard task and throw RejectedExecutionException anomalies. (default) ThreadPoolExecutor. DiscardPolicy: task is discarded, but does not throw an exception. ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then to try to perform a task (repeat) ThreadPoolExecutor. CallerRunsPolicy: handle the tasks by the calling thread

Refused to policy resolution: blog.csdn.net/suifeng629/…

How does a thread pool graph with thread pool parameters

2.5. Interpretation of Ali’s development code

【 Mandatory 】 Thread pools cannot be created by Executors. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion.

Open your compiler and take a look at the constructor of the thread pool provided by the JDK in 2.3, then take a look at the corresponding work queue in 2.4, and finally combine it with the diagram in 2.4. I think you get the idea?

1) newFixedThreadPool and newSingleThreadExecutor: The main problem is the use of unbounded queues, which can be very expensive in memory, or even OOM, in case of exceptions. 2) newCachedThreadPool and newScheduledThreadPool: The main problem is that the maximum number of threads is integer. MAX_VALUE.

Thread pool best practices

List of features supported by the best thread pool instance

1. Lazy loading thread pools are supported

2. ThreadLocal thread variables are passed from parent thread to child thread

3. Supports link tracing traceId printing

4. Batch task submission is supported

5. Support thread pool task running exception capture

3.1. Thread pool interface classes

import java.util.List; import java.util.concurrent.Future; /** * thread pool interface ** @author baiyan * @since 2021/05/11 */ public interface ThreadPoolService {/** * add task * @param task to thread pool Task * @return task (must implement Runnable interface) */ Future<? > addTask(Runnable task); /** * Asynchronously execute a batch of tasks until the task is completed * @param Task */ void runTasksUntilEnd(List<Runnable> task); /** * Add a looped task to the thread pool * @param task (must implement Runnable interface) * @param interval Time interval, */ void loopTask(Runnable task, long interval); /** * Add a loop task to the thread pool * @param task (must implement the Runnable interface) * @param interval Interval in milliseconds * @param delay Delay in execution in milliseconds, */ void loopTask(Runnable task, long interval, long delay); */ void loopTask(Runnable task, long interval, long delay); /** * stop thread pool */ void stop(); /** * getActiveCount(); /** * getActiveCount(); }Copy the code

Define thread pool methods commonly used on the business side

3.2. Thread pool implementation classes

import com.alibaba.ttl.TtlRunnable; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.sleuth.instrument.async.LazyTraceThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; ** @author baiyan * @since 2021/05/11 */ @slf4j public class ThreadPoolServiceImpl implements ThreadPoolService {/** * corePoolSize = 20; /** * private int maximumPoolSize = 60; Setter private int keepAliveTime = 60; Setter private int keepAliveTime = 60; / * * * * / private singleton thread pool LazyTraceThreadPoolTaskExecutor threadPoolExecutor; /** * Private ScheduledExecutorService ScheduledExecutorService; Setter private int queueSize = 100; Setter private int queueSize = 100; @Setter private boolean inited = false; /** * block when thread pool is full */ @setter private Boolean blockWhenFull = true; Public synchronized void init() {if(inited) {return; } ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(this.corePoolSize); threadPoolTaskExecutor.setMaxPoolSize(this.maximumPoolSize); threadPoolTaskExecutor.setQueueCapacity(this.queueSize); threadPoolTaskExecutor.setKeepAliveSeconds(this.keepAliveTime); threadPoolTaskExecutor.setThreadNamePrefix("baiyan-Thread-"); threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true); threadPoolTaskExecutor.initialize(); this.threadPoolExecutor = new LazyTraceThreadPoolTaskExecutor(SpringContextUtil.getApplicationContext(), threadPoolTaskExecutor); inited = true; } @Override public Future<? > addTask(Runnable task) { if(! inited) { init(); } return threadPoolExecutor.submit(TtlRunnable.get(task)); } /** * Private class BlockingQueuePut implements RejectedExecutionHandler {/** * @param r * @param executor */ @override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if(blockWhenFull) { try { executor.getQueue().put(r); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } } @Override public int getActiveCount() { return threadPoolExecutor.getActiveCount(); } @Override public void stop() { threadPoolExecutor.shutdown(); if(scheduledExecutorService ! = null) { scheduledExecutorService.shutdownNow(); } } @Override public synchronized void loopTask(Runnable task, long interval) { loopTask(task, interval, 0); } @Override public void loopTask(Runnable task, long interval, long delay) { if(scheduledExecutorService == null) { ThreadFactory threadFactory = new ThreadPoolServiceImpl.ScheduledThreadFactory("schedule-pool-%d-%s"); scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory); } int minInterval=100; If (interval < minInterval) {throw new IllegalArgumentException(" Loop tasks within 100ms are not allowed to be scheduled "); } scheduledExecutorService.scheduleAtFixedRate(TtlRunnable.get(task), delay, interval, TimeUnit.MILLISECONDS); } @Override public void runTasksUntilEnd(List<Runnable> tasks) { List<Future<? >> futures = new ArrayList<Future<? > > (); for(Runnable task : tasks) { futures.add(addTask(task)); } for(Future<? > f : futures) { try { f.get(); } catch (Exception e) { log.warn("", e); }}} / * * * for singleton thread pool example * @ return * / protected LazyTraceThreadPoolTaskExecutor getExecutorService () {return threadPoolExecutor; } /** * static class ScheduledThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; ScheduledThreadFactory(String namePrefix) { SecurityManager s = System.getSecurityManager(); group = (s ! = null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.namePrefix = String.format(namePrefix, poolNumber.getAndIncrement(), "%d"); } String getThreadName() { return String.format(namePrefix, threadNumber.getAndIncrement()); @override public Thread newThread(Runnable r) {Thread t = newThread(group, r, getThreadName(), 0); if (! t.isDaemon()){ t.setDaemon(true); } if (t.getPriority() ! = Thread.NORM_PRIORITY){ t.setPriority(Thread.NORM_PRIORITY); } return t; }}}Copy the code

3.2.1. Initialize the thread pool

When a business application calls the addTask method, it makes a judgment call

if(! inited) { init(); }Copy the code

Whether the current thread pool has been initialized. If not, initialize the thread pool first

Call the init() method

Public synchronized void init() {if(inited) {return; } ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(this.corePoolSize); threadPoolTaskExecutor.setMaxPoolSize(this.maximumPoolSize); threadPoolTaskExecutor.setQueueCapacity(this.queueSize); threadPoolTaskExecutor.setKeepAliveSeconds(this.keepAliveTime); threadPoolTaskExecutor.setThreadNamePrefix("baiyan-Thread-"); threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true); threadPoolTaskExecutor.initialize(); this.threadPoolExecutor = new LazyTraceThreadPoolTaskExecutor(SpringContextUtil.getApplicationContext(), threadPoolTaskExecutor); inited = true; }Copy the code

ThreadPoolTaskExecutor for spring thread pool, the internal implementation is also based on ThreadPoolExecutor, using LazyTraceThreadPoolTaskExecutor reason is here, To access the SpringCloud component SLEut [link Tracking], it is easy to record links when thread pools are called by requests or scheduled tasks.

3.2.2. Thread pool task submission

Thread pool task submission is supported in several ways

/** * Add tasks to the thread pool * @param task * @return task (must implement Runnable interface) */ Future<? > addTask(Runnable task); /** * Asynchronously execute a batch of tasks until the task is completed * @param Task */ void runTasksUntilEnd(List<Runnable> task); /** * Add a looped task to the thread pool * @param task (must implement Runnable interface) * @param interval Time interval, */ void loopTask(Runnable task, long interval); /** * Add a loop task to the thread pool * @param task (must implement the Runnable interface) * @param interval Interval in milliseconds * @param delay Delay in execution in milliseconds, */ void loopTask(Runnable task, long interval, long delay); */ void loopTask(Runnable task, long interval, long delay);Copy the code

The corresponding method implementation classes wrap TtlRunnable so that the parent thread’s ThreadLocal variable can be passed in.

Here’s a reference to my other blog post on ThreadLocal variable pass resolution in the thread pool: juejin.cn/post/695538…

3.3.3. Parameter Description

There is an online formula for setting the number of core threads

IO intensive = 2NCPU

CPU intensive =NCUP+1

I personally think so, if the running dependency on the thread pool is not that high, or if the thread pool is not used frequently, but some tasks are running, you can rely on the above formula to configure the number of core threads. I think it is still necessary to set parameters from the actual online environment of the machine configuration, the actual business running state of pressure measurement. Two articles are recommended below

1. About how the thread pool parameters according to actual business scenario Settings: www.cnblogs.com/superming/p…

2. Meituan SAO operation, online through setting the thread pool configuration center dynamic parameters: tech.meituan.com/2020/04/02/…

3.3. Business applications create business thread pools

/** * public class CommonThreadPool {/** ** private volatile thread pool ** @author baiyan * @date 2021/05/11 */ public class CommonThreadPool {/** ** private volatile thread pool ** static ThreadPoolService threadPoolService; Private CommonThreadPool(){} private CommonThreadPool(){} public static ThreadPoolService getBlockThreadPool() { if (threadPoolService == null) { synchronized (CommonThreadPool.class) { if (threadPoolService ==  null) { threadPoolService = new ThreadPoolServiceImpl(); } } } return threadPoolService; }}Copy the code

Service applications use singleton mode to create thread pools based on implementation classes and interfaces. You are advised to create one thread pool for class 1 services. The total number of thread pools in service applications is not more than three.

3.4. Service Usage Examples

Do not catch exceptions

@RestController @Slf4j public class DemoController { @GetMapping("/test") public void test(){ for (int i = 0; i < 10; i++) { int finalI = i; CommonThreadPool. GetBlockThreadPool (.) addTask (() - > {the info (" thread thread: "+ finalI); }); }}} the console output - the 2021-05-11 21:36:38. 596, INFO, [ad8d85b916f42108, 01 e67ff7c6edc0db ad8d85b916f42108], [baiyan - Thread - 4]. Com.examp. FileController - Thread Thread: 3-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108, 7 ab5e7047f3543e5, ad8d85b916f42108], [baiyan - Thread - 2), Com.examp. FileController - Thread Thread: 1-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108, 591 a8274b5ee66e5, ad8d85b916f42108], [baiyan - Thread - 7), Com.examp. FileController - Thread Thread: 6-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,f1760980349388b9,ad8d85b916f42108], [baiyan-Thread-6], Com.examp. FileController - Thread Thread: 5-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,fb6103c607690d63,ad8d85b916f42108], [baiyan-Thread-8], Com.examp. FileController - Thread Thread: 7-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,f67e9fdbb5f7dfa9,ad8d85b916f42108], [baiyan-Thread-9], Com.examp. FileController - Thread Thread: 8-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108, 55 cb195f3d0f0708, ad8d85b916f42108], [baiyan - Thread - 3]. Com.examp. FileController - Thread Thread: 2-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108, 0 f5fcc1ec768d003, ad8d85b916f42108], [baiyan - Thread - 5), Com.examp. FileController - Thread Thread: 4-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108, 78339 a17929239e4, ad8d85b916f42108], [baiyan - Thread - 1), Com.examp. FileController - Thread Thread: 0-2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108, 747 a5b593797f9c7, ad8d85b916f42108], [baiyan - Thread - 10], com. Examp. FileController - Thread Thread: 9Copy the code

Cases where exceptions need to be caught

@RestController @Slf4j public class FileController { @GetMapping("/test") public void test() throws ExecutionException, InterruptedException { List<Runnable> runnableList = new ArrayList<>(); for (int i = 0; i < 10; i++) { runnableList.add(()->{ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } throw new RuntimeException("1"); }); } CommonThreadPool.getBlockThreadPool().runTasksUntilEnd(runnableList); }}Copy the code

The runTasksUntilEnd method calls feature.get() internally to get the exception for task execution.

3.5. SpringSontextUtil code

import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author baiyan * @date 2021/05/11 */ @Component public class SpringContextUtil implements ApplicationContextAware {/** * Spring context */ private Static ApplicationContext ApplicationContext; /** * Implements the ApplicationContextAware interface callback method. Public void setApplicationContext(applicationContext) applicationContext) { SpringContextUtil.applicationContext = applicationContext; } /** * @return ApplicationContext */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * Obtain the Object * @param name * @return Object * @throws BeansException */ public static Object getBean(String name) throws  BeansException { return applicationContext.getBean(name); } public static <T> T getBean(Class<T> clazz) {public static <T> T getBean(Class<T> clazz); throws BeansException { return applicationContext.getBean(clazz); }}Copy the code

3.5. Three parties rely on POM

//TTL dependency <dependency> <groupId>com.alibaba</groupId> <artifactId> </artifactId> < version > 2.2.0 < / version > < / dependency > / / link tracking rely on < dependency > < groupId > org. Springframework. Cloud < / groupId > < artifactId > spring - the cloud - starter - sleuth < / artifactId > < version > 2.0.1. RELEASE < / version > < / dependency >Copy the code

Dependency on link tracing and TTL

4. Source code acquisition

Github.com/louyanfeng2…

The thread pool on Github is ThreadPoolExecutor provided by the JDK, so link tracing is not supported. You can modify it according to the blog. Of course, if there is no link tracing request, you can also directly use the code on Github.

Reference 5.

www.cnblogs.com/javanoob/p/…

www.jianshu.com/p/7726c70cd…

Six. Contact me

Nailing: louyanfeng25

WeChat: baiyan_lou