Nowadays we use the server, is mostly multi – processor multi – core configuration, sufficient resources. In order to make full use of server performance, decouple calling threads from asynchronous threads and improve response speed, using concurrent programming became our better choice. This article will show you how to open a thread pool provided by the JDK, using a file upload example.

Introduction to thread pools

The core implementation class of the thread pool provided in JDK is ThreadPoolExecutor. IDEA Show Diagrams

  • The top interfaceExecutorOnly one is providedvoid execute(Runnable command)Method to decouple task definition from task execution, the user only needs to defineRunnableTask.
  • ExecutorServiceInterface inheritsExecutorInterface, on the basis of task execution, added with return<T> Future<T> submit(Callable<T> task)Methods, andBatch executing asynchronous tasksandThe thread pool starts and stopsAnd other management functions.
  • AbstractExecutorServiceTo achieve theExecutorService, as a task template,Connect the process of task execution, allowing the lower implementation classes to focus only on task execution.
  • ThreadPoolExecutorIs achievedTask management,Thread management,Thread pool life cycle management, and other functions.

Ii. Task execution process

Let’s look at the default thread pool execution flow from source code:

. Int c = ctl.get(); int c = ctl.get(); // The number of worker threads is less than the number of core threads. If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // Check the running status again, and if the status is abnormal (such as shutdownNow), remove the task and call back the rejection policy. if (! IsRunning (recheck) && remove(command)) // Execute reject(command); // If the worker thread is 0, a worker thread is initialized. // In the extreme case, threads are collected when they join the queue. Else if (workerCountOf(recheck) == 0) // Add thread addWorker(null, false); } // The number of running threads is greater than or equal to the number of core threads and the queue is full. AddWorker (command, false)) // Add reject(command); }Copy the code

Flow chart:

Of course, this is the default thread pool execution flow, which is suitable for CPU intensive applications, and there are many middleware secondary development based on ThreadPoolExecutor. For example, Tomcat, Netty, and Dubbo all have corresponding implementations. Tomcat changes the execution process to increase the number of threads to the maximum before entering the queue, thus reducing resource waste when IO intensive applications are blocked.

Custom thread pools

2.1 Creating a thread pool

The JDK itself provides some out-of-the-box thread pools, such as FixedThreadPool and CachedThreadPool, but the parameter Settings are fixed and some thread pools use unbounded queues. When the system concurrency is too high or the program design is defective, It is very easy to run out of memory or some other unexpected exception.

Here we use the following constructor of ThreadPoolExecutor to create the thread pool.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
Copy the code

Thread pool creation code is as follows:

/** * @author winsonWu * @Description: thread pool creating configuration * @date Date : 2021.04.13 16:00 */ @configuration Public Class ThreadPoolCreator {/** * private static int corePoolSize = Runtime.getRuntime().availableProcessors() + 1; */ private static int maximumPoolSize = corePoolSize; Private static Long keepAliveTime = 3; /** * private static TimeUnit = timeunit.minutes; Private static BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>(500); /** * Thread factory, here we use a named thread factory to facilitate business differentiation and production problems troubleshooting. */ private static ThreadFactory threadFactory = new NamedThreadFactory("taskResolver"); / * * * refused to strategy According to the business choice or custom * / private static RejectedExecutionHandler handler = new ThreadPoolExecutor. AbortPolicy (); @Bean public ThreadPoolExecutor threadPoolExecutor(){ return new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); }}Copy the code

2.2 Number of core threads

Concurrent tasks are generally divided into CPU intensive tasks and IO intensive tasks.

Cpu-intensive tasks require the CPU to perform complex and high-density computing. Do not create too many threads for this type of task. Otherwise, frequent switching will occur, reducing resource utilization and task processing speed. For IO-intensive tasks, threads do not have too strict requirements on CPU resources and may block IO for most of the time. Increasing the number of threads can improve the concurrency and process as many tasks as possible. General empirical configuration:

CPU intensive N + 1 But not more than twice the number of OS cores IO intensive 2N + 1 N indicates the number of server cores.Copy the code

In the production environment, you are advised to set the parameters based on the pressure test results.

2.3 Blocking queue

BlockingQueueWhen the queue is empty, the thread that fetched the element blocks, waiting for the queue to become non-empty. When the queue is full, the thread storing the element blocks, waiting for the queue to be consumed, naturally supporting the production consumer model of thread pools. Common blocking queues are as follows:Most scenarios we useLinkedBlockingQueueCan be solved, here we also choose to useLinkedBlockingQueue.

2.4 Rejection policies

By default, when the thread pool blocking queue is full and the thread pool has reached its maximum number of threads, a rejection policy is implemented, and the JDK has four built-in rejection policies for us:

Let’s illustrate two scenarios here,

  • Scenario 1: System monitoring. Custom annotations and AOP parse the interface to be monitored and get its input and output parameters, and then store logs to HDFS via asynchronous tasks.

    In log collection scenarios, a small amount of data loss has little impact on services. Therefore, we can set the number of core threads to 1 to reduce the occupation of server resources. The blocking queue capacity can be appropriately increased according to the pressure test results. If AbortPolicy is used, execute(…) The submit(…) method uses a Try Catch or UncaughtExceptionHandler(not recommended) to handle exceptions. The future.get () method gets the exception and handles it.

  • Scenario 2: Message queue consumption. Message piling can be done in batches via asynchronous threads.

    In this scenario, the data cannot be lost, so we use CallerRunsPolicy to let the calling thread perform the message processing logic. However, this approach will affect the business executed by the calling thread. A better approach is to use a custom reject policy to persist or queue. Let’s first look at the implementation of the CallerRunsPolicy reject policy:

    public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { r.run(); }}}Copy the code

    To customize the RejectedExecutionHandler policy, implement the rejectedExecution(Runnable r, ThreadPoolExecutor e) interface and override the rejectedExecution(Runnable r, ThreadPoolExecutor e) interface.

    /** * @author winsonWu * @description: Persistent reject policy * @date date: 2022.04.14 9:38 **/ public class DataBaseStoragePolicy implements RejectedExecutionHandler {@override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // todo duration or something } }Copy the code

2.5 Thread preheating and recycling

  • Thread preheating

    If the server is expected to start with a large number of requests, such as the stack message processing scenario mentioned in the rejection policy module, we can use thread pool preheating to create core threads ahead of time to speed up the service accordingly. There are three methods in ThreadPoolExecutor:

    Public Boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); }Copy the code
    Public int prestartAllCoreThreads() {int n = 0; while (addWorker(null, true)) ++n; return n; }Copy the code
    // Ensure that at least one core thread starts void ensurePrestart() {int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }Copy the code
  • Threads recycling

    By default, if workerCount is greater than corePoolSize, the thread pool will automatically reclaim the idle thread if it has been idle for longer than keepAliveTime. It can also be recycled to improve resource utilization.

2.6 Thread pool monitoring

ThreadPoolExecutor itself provides a number of state query methods to retrieve thread pool state information. Let’s modify the previous Bean definition to look at the methods in the code comments:

. @Bean public ThreadPoolExecutor threadPoolExecutor(){ return new ThreadPoolExecutor( corePoolSize, maximumPoolSize, KeepAliveTime, unit, workQueue, threadFactory, handler){Override protected void beforeExecute(Thread t, Runnable r) {system.out.println (" thread pool size: "+ this.getpoolSize ()); System.out.println(" core thread count: "+ this.getCorePoolSize()); System.out.println(" maximum number of threads: "+ this.getlargestPoolSize ()); System.out.println(" active thread count: "+ this.getActivecount ()); } @override protected void afterExecute(Runnable r, Throwable t) {system.out.println (" Active thread count: " + this.getActiveCount()); System.out.println(" getTaskCount: "+ this.gettAskCount ()); @override protected void terminated() {// Get the number of terminated tasks system.out.println (" number of terminated tasks: " + this.getCompletedTaskCount()); }}; }Copy the code

2.7 Thread pool life cycle

There are five main states in the thread pool. The code is defined as follows:

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

State transitions are as follows:

The important thing to note here is that we executeshutdown()Method no longer receives new tasks, but processes the remaining tasks in the blocking queue, whileshutdownNow()Method that stops receiving new tasks and interrupts the remaining tasks in the blocking queue.

Thread pool practices

3.1 Based on the practice

Now that we have defined thread pools, let’s try out the basic methods:

@Resource private ThreadPoolExecutor threadPoolExecutor; @test public void testMultiThread() throws InterruptedException { Early start all core thread threadPoolExecutor. PrestartAllCoreThreads (); StopWatch = new StopWatch(" thread pool test "); stopwatch.start("execute"); // execute(e(Runnable command) CountDownLatch forExecute = new CountDownLatch(1); threadPoolExecutor.execute(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { System.out.println("interrupted ignore"); } System.out.println("execute(Runnable command) test"); forExecute.countDown(); }); forExecute.await(); stopwatch.stop(); stopwatch.start("submit"); // submit(Runnable command) CountDownLatch forSubmit = new CountDownLatch(1); final Future<String> future = threadPoolExecutor.submit(() -> { System.out.println("submit(Runnable command) test"); forSubmit.countDown(); return "submit(Runnable command) test"; }); try { final String result = future.get(); System.out.println("result: " + result); } catch (ExecutionException e) {//todo custom exception handling} forsubmit.await (); stopwatch.stop(); System.out.println(stopwatch.prettyPrint()); }Copy the code

The result is as follows:

The difference between execute(E) and submit(Runnable Command) has been described previously. Also, you can see that we use CountDownLatch for thread coordination, and submit is executed after execute.

3.2 File Upload Practices

Next we demonstrate asynchronous task choreography using CompletableFuture with a file upload feature. The business description we want to achieve is as follows:

  1. Upload files in batches
  2. After the file is uploaded, the file name and file ID list are displayed
  3. Print logs to console when exceptions occur (demo, production environment can be customized)

The implementation code is as follows:

Define the return object:

@data public class FileEntry implements Serializable {/** * fileId */ private String fileId; /** * fileName */ private String fileName; }Copy the code

File upload logic:

/** * test code, @param eachFile * @return */ private FileEntry createFileEntry(MultipartFile eachFile){// Generate file ID String  fileId = UUID.randomUUID().toString().replace("-", ""); File desFile = new File(FILE_LOCATION + fileId + "_" + eachFile.getOriginalFilename()); try { eachFile.transferTo(desFile); } catch (IOException e) {throw new BizException(" file upload failed "); FileEntry FileEntry = new FileEntry(); fileEntry.setFileName(eachFile.getOriginalFilename()); fileEntry.setFileId(fileId); return fileEntry; }Copy the code

The main logic:

Public ArrayList<FileEntry> uploadFile(MultipartFile[] files){// Initialize the returned value ArrayList<FileEntry> fileEntryList = new ArrayList<>(files.length); List<CompletableFuture<FileEntry>> futureList = new ArrayList<>(files.length); for (MultipartFile eachFile : Files) {/ / before using the thread pool implementation defined file upload logic CompletableFuture < FileEntry > future. = CompletableFuture supplyAsync (() - > createFileEntry(eachFile), threadPoolExecutor); // Add (future) to futurelist.add (future); } CompletableFuture<Void> fileUploadFuture = CompletableFuture .allOf(futureList.toArray(new CompletableFuture[futureList.size()])) .whenComplete((v, T) -> futurelist.foreach (future -> {// Add the returned result to the list of returned values fileentryList.add (future.getNow(null)); }). Exceptionally (exception -> {// Todo custom logic System.out.println("error occurred: "+ exception.getMessage()); return null; }); // block the main thread until all files are uploaded fileuploadfuture.join (); Return fileEntryList; }Copy the code

At this point, we’ve simplified the logic implementation to complete the CompletableFuture usage in the Resources section.

Iv. Reference materials

  • Insight – How to master thread pool design and principles from the outside
  • Full of things – Count CompletableFuture’s fancy gameplay
  • Implementation principle of Java thread pool and its practice in Meituan business
  • Why does Tomcat need to extend the native thread pool when it is so powerful?