An overview,
1, problem,
Let’s look at the problem we encountered: we created the Thread simply, new Thread(() -> {… }), and it is because of this simple and crude way that the fatal problem arises. First of all, thread creation and destruction are very time consuming and wasteful operations. What do you use threads for? For asynchrony, for performance. Simple new three or five threads is fine, I need a thousand threads? Do you also loop for new1000 threads? Destroy them after use. The performance of creating and destroying a thousand threads is terrible!
2, solve
In order to solve the above problems, thread pool was born, the core idea of thread pool is: thread reuse. That is, the thread is not destroyed when it runs out, it is put in the pool and waits for new tasks to arrive, and it repeatedly uses N threads to perform all new and old tasks. The overhead would only be the creation of those N threads, rather than the life-to-death process of one thread for every request.
Thread pools
1, concept,
By the way, the solution to the above problem is pretty straightforward. For super small white I give a life case:
A job interview, for example, involves two roles: the interviewer and the candidate. Do thousands of applicants need to find a new person to interview for each applicant? Obviously not, the company all has the interviewer pool, for example: A, B, C you three are the interviewer of this company, someone comes to interview you three turn face to go. Probably not quite right, but what it means is that instead of assigning a new thread (interviewer) to each request (candidate), I have several threads that handle all requests. No repeated creation and destruction.
2, parameters,
2.1, the source code
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}Copy the code
2.2, interpretation
-
corePoolSize
: Number of core threads
After the thread pool is initialized, by default, there are no threads in the pool, and the pool will wait to create a thread when a task arrives. Once a core thread is created, it will not be destroyed even if it exceeds the lifetime configuration that the thread maintains. Once the core thread is created, it will remain forever, waiting for new tasks to come in and process it.
-
maximumPoolSize
: Maximum number of threads
If the core thread is too busy and the task queue is full, threads will continue to be created, but not arbitrarily. When the number of threads (including the core thread) reaches maximumPoolSize, no new threads will be created and the rejection policy will be implemented.
-
keepAliveTime
: Indicates the lifetime of the thread
If the thread pool currently has more threads than corePoolSize, then if the extra threads are idle for longer than keepAliveTime, these extra threads (those that exceed the core thread count) will be reclaimed.
-
unit
: Unit of thread keepalive time
For example, timeunit. MILLISECONDS and timeunit.seconds
-
workQueue
: Task storage queue
If the number of core threads is full and tasks continue to be submitted to the thread pool, the workQueue is entered first.
WorkQueue usually has the following options:
LinkedBlockingQueue: unbounded queue, meaning unlimited, actually limited, size is the maximum int. You can also customize the size.
ArrayBlockingQueue: a bounded queue that can be of a customized size and starts a new thread at a threshold (no larger than maximumPoolSize).
SynchronousQueue will: Executors newCachedThreadPool (); The queue used by default. It’s not a queue, it doesn’t have the ability to store elements.
Generally take LinkedBlockingQueue because it can also set the size and can replace ArrayBlockingQueue with a bounded queue.
-
threadFactory
: is used when the thread pool needs new threads
threadFactory
To generate a new thread
The default is DefaultThreadFactory, which is responsible for creating threads. NewThread () method. The created threads are all in the same thread group and have the same priority.
-
handler
: reject policy, executed if the number of tasks exceeds the configured limit of the thread pool or if shutdown is executed and the task continues to be submitted
handler
The logic.
AbortPolicy is used by default. In this case, the thread pool will adopt a direct rejection policy, that is, throw an exception directly. RejectedExecutionException
3, the principle of
Principle of 3.1,
-
The pool starts with zero core threads
-
When you drop a task to a thread pool, the pool starts a new thread to execute the task
-
If the number of threads is less than corePoolSize, a new thread is created to perform the new task even if the worker thread is idle
-
If the number of threads is greater than or equal to corePoolSize, the task is placed in the workQueue, or task queue
-
If the task queue is full and the number of threads is less than maximumPoolSize, a new thread is created to run the task
-
If the task queue is full and the number of threads is greater than or equal to maximumPoolSize, the reject policy is adopted directly
3.2, diagrams
3.3, for example,
Thread pool parameter configuration: 5 core threads, 10 maximum threads, and 100 queue length.
So when the thread pool is started, no threads are created. If six requests come in, five core threads are created to handle the five requests, and the other one that is not being processed is put on the queue. At this point 99 requests come in, and the thread pool finds that the core thread is full, and the queue is still 99 places empty, so 99 of them go into the queue, plus the last one is exactly 100. Five more requests come in, and the pool opens up five more non-core threads to handle those five requests. The current situation is that there are 10 threads in the RUNNING state in the thread pool and 100 threads in the queue are full. If another request comes in at this time, the rejection policy is used directly.
3.4, the source code
public void execute(Runnable command) { int c = ctl.get(); // workerCountOf(c) : number of worker threads // The number of workers is smaller than the number of core threadsif(workerCountOf(c) < corePoolSize) {// addWorker is responsible for creating threads and executing tasksif (addWorker(command.true)) return; c = ctl.get(); } // The number of workers exceeds the number of core threads, and the task is directly queuedif (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // If the thread pool is not in the RUNNING state, the shutdown command has been run, and you need to run reject() on the new task. // The reason for recheck here is that the state of the thread pool may change before and after the task is queued.if (! isRunning(recheck) && remove(command)) reject(command); The core thread count is allowed to be 0 in the thread pool constructorelse if (workerCountOf(recheck) == 0) addWorker(null, false); } // If the thread pool is not running or the task fails to queue, try creating worker to execute the task. AddWorker determines the state of the thread pool when it is not running // 2. The second parameter of addWorker indicates whether a core thread is created // 3false", the task fails. You need to run rejectelse if(! addWorker(command.false)) reject(command); }Copy the code
4, Executors
4.1, concepts,
First of all, this is not a thread pool, this is a thread pool utility class, which can easily create threads for us. However, alibaba development Manual does not recommend using Executors to create a thread pool. You are recommended to define your own thread pool. That’s because any of the thread pools created by Executors can cause havoc, as discussed below.
4.2. Fixed number of threads
2, description,
The number of core threads is the same as the maximum number of threads, so it is called the fixed number of threads.
The default configurations of other parameters are: never timeout (0ms), LinkedBlockingQueue (unbounded queue), DefaultThreadFactory (DefaultThreadFactory), and AbortPolicy.
4.2.2, API
Executors.newFixedThreadPool(n);
Holdings, demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: Create 2 threads to execute 10 tasks. * * @author TongWei.Chen 2020-07-09 21:28:34 */public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2);for(int i = 0; i < 10; I++) {// thread names are always two. There won't be a third. executorService.execute(() -> System.out.println(Thread.currentThread().getName())); }}}Copy the code
4.2.4, problem,
The problem is that it is an unbounded queue, the queue can hold the maximum number of ints, if the number of concurrent tasks is very high, it is most likely to be OOM, and the tasks are still piling up, after all, it is using JVM memory directly. Therefore, it is suggested to customize the thread pool, specify the appropriate queue size according to the requirements, and customize the rejection policy to put the tasks exceeding the queue size into the external memory for compensation, such as Redis. Just don’t overwhelm the business system.
4.2.5, source
public static ExecutorService newFixedThreadPool(int nThreads) { returnNew ThreadPoolExecutor(nThreads, nThreads, 0L, timeunit.milliseconds, // unbounded queue!! The crux of a fatal problem. new LinkedBlockingQueue<Runnable>()); }Copy the code
4.3. Single thread
4.3.1, description,
The number of core threads and the maximum number of threads are 1, which is the internal default and cannot be changed, so it is called a single thread pool.
Similar to the Executors. NewFixedThreadPool (1);
The default configurations of other parameters are: never timeout (0ms), LinkedBlockingQueue (unbounded queue), DefaultThreadFactory (DefaultThreadFactory), and AbortPolicy.
4.3.2, API
Executors.newSingleThreadExecutor();
4.3.3, demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: Create 1 thread to execute 10 tasks. * * @author TongWei.Chen 2020-07-09 21:28:34 */public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor();for(int i = 0; i < 10; I++) {// the thread name is pool-1-thread-1. There will be no second. executorService.execute(() -> System.out.println(Thread.currentThread().getName())); }}}Copy the code
4.3.4, problem,
The same problem as [4.2, fixed number of threads] is caused by unbounded queues.
4.3.5, source
public static ExecutorService newSingleThreadExecutor() { returnNew FinalizableDelegatedExecutorService (new ThreadPoolExecutor (/ / core number of threads and the maximum number of threads are 1, write die, the client may be changed. Timeunit. MILLISECONDS, // unbounded queue!! The crux of a fatal problem. new LinkedBlockingQueue<Runnable>())); }Copy the code
4.4. Thread pools with caching
4.4.1, description,
And what it does is it takes a task and I create a thread to do it, it doesn’t queue, and the SynchronousQueue doesn’t store elements. This means that 100 million requests will open up 100 million threads to handle. KeepAliveTime is 60 seconds, which means that threads will be killed if idle time exceeds 60 seconds. This is called a thread pool with caching.
The number of core threads is 0, and the maximum number of threads is the maximum value of int, which is internally default and cannot be changed.
The default Settings for other parameters are 1 minute timeout (60s), SynchronousQueue, DefaultThreadFactory, and AbortPolicy.
4.4.2, API
Executors.newCachedThreadPool();
4.4.3, demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: Create a cacheable thread pool to perform 10 tasks. * * @author TongWei.Chen 2020-07-09 21:28:34 */public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool();for(int i = 0; i < 10; I++) {// there are 10 thread names in the result. That is, a few tasks will open up several threads. executorService.execute(() -> System.out.println(Thread.currentThread().getName())); }}}Copy the code
4.4.4, problem,
The problem is that its maximum number of threads is the maximum number of ints, because its internal queue is SynchronousQueue, which has no capacity to hold elements. This would mean that I would start threads whenever requested. At its peak, I could create over 2 billion threads for work.
4.4.5, source
public static ExecutorService newCachedThreadPool() { returnNew ThreadPoolExecutor(// The number of core threads is 0, the maximum number of threads is integer. MAX_VALUE, this can be fatal!! 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }Copy the code
4.5. Thread pool with scheduling function
4.5.1, description,
This thread pool is heavily used internally by RocketMQ for tasks such as heartbeats.
The number of core threads is manually passed in. The maximum number of threads is integer. MAX_VALUE, which is an internal default and cannot be changed.
The default Settings of other parameters are: Never timeout (0ns), DelayedWorkQueue with delay (DelayedWorkQueue), DefaultThreadFactory (DefaultThreadFactory), and AbortPolicy.
4.5.2, API
Executors.newScheduledThreadPool(n);
4.5.3, demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: create a thread pool with scheduling function to execute the task. * * @author TongWei.Chen 2020-07-09 21:28:34 */public class ThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); / / five seconds a scheduledExecutorService. The schedule (() - > System. Out. The println (Thread. The currentThread (). The getName ()), 5, TimeUnit.SECONDS); // Execute after the first 5 seconds, Second to perform a scheduledExecutorService. Every 1 s scheduleAtFixedRate (() - > System. Out. The println (Thread. The currentThread (). The getName ()), 5, 1, TimeUnit.SECONDS); }}Copy the code
4.5.4, problem,
[Ii.4.4, problems with cached thread pools]
The problem is that the maximum number of threads is the maximum number of ints, which means that a large number of concurrent threads can be created to work on more than 2 billion threads.
4.5.5, source
Public ScheduledThreadPoolExecutor (int corePoolSize) {/ / fatal problem like newCachedThreadPool, maximum number of threads can open to billions of (Integer. MAX_VALUE)!!! super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }Copy the code
4.6. Stop the thread
4.6.1, shutdown
For example, if the current thread pool is still executing tasks, the thread pool will not stop working immediately. If the thread pool is shutdown, the thread pool will wait for the task to complete. If the thread pool is shutdown, the external world will continue to submit tasks to the thread pool. The thread pool then adopts the reject policy directly.
4.6.2, isShutdown
Check whether the thread pool has been shutdown.
4.6.3, shutdownNow
Violence ends the thread pool. Regardless of whether your current thread pool has tasks running or messages piling up in the queue, I just let the thread pool hang. But its return value is the unexecuted tasks in the queue. You can log something if you want.
4.7, doubt
Why do these thread pools have different queues? For example, why newFixedThreadPool takes LinkedBlockingQueue and newCachedThreadPool takes SynchronousQueue?
Because newFixedThreadPool has a limited number of threads and doesn’t want to lose work, it has to use an unbounded queue. NewCachedThreadPool has a maximum number of int threads, so there’s no need to use an unbounded queue.
5. Summarize some questions
1. Thread pool status
-
RUNNING
: Accept new tasks and handle queuing tasks. -
SHUTDOWN
: does not accept new tasks, but handles queuing tasks. [See: Stop thread 4.6.1, shutdown] -
STOP
: Does not accept new tasks, does not process queued tasks, and does not correct ongoing tasks. -
TIDYING
When the worker thread is zero, it enters this state and executes the terminate() hook method. -
TERMINATED
The: terminate() hook method completes.
2. Is the thread pool created automatically or manually?
That must have been manual, because the thread pools that Executors automatically create are having deadly problems. Manually creating a thread pool we can control the size of the thread count, the size of the queue, and specify group names and other personalization. There will be no fatal problems. We have all the risks under control.
3. How many threads is appropriate?
-
CPU intensive (such as encryption and complex computing) : You are advised to set the number of CPU cores to +1.
-
Time-consuming I/O operations (such as reading and writing databases, compressing and decompressing large files, etc.) : Usually set the number of CPU cores to twice. Of course, there is a nice formula: number of threads = number of CPU cores * (1+ average waiting time/Average working time)
4, before&after
There are two ways to print logs or do other work before and after a thread executes.
The source code is as follows:
// Beforeexecute (wt, task); Throwable thrown = null; Try {// Actually execute task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// After afterExecute(task, thrown); }Copy the code
6. Core source code (all)
1. Explanation of common variables
// 1. 'CTL', which can be treated as a number of type int, 3 bits higher to indicate the status of the thread pool, Private Final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); // 2. 'COUNT_BITS', 'integer. SIZE' is 32, so 'COUNT_BITS' is 29private static final int COUNT_BITS = integer. SIZE - 3; // 3. 'CAPACITY', the maximum number of threads allowed by the thread pool. Private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is storedinThe thread pool has 5 states, in order of size: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATEDprivate static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {private static int runStateOf(int c) {returnc & ~CAPACITY; } private static int workerCountOf(int c) {workerCountOf(int c) {returnc & CAPACITY; } private static int ctlOf(int rs, int wc) {private static int ctlOf(int rs, int wc) {return rs | wc; }/* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */// 8. 'runStateLessThan()', the thread pool status is less than xxprivate static Boolean runStateLessThan(int c, int s) {return c < s; Xxprivate static Boolean runStateAtLeast(int c, int s) {return c >= s; }Copy the code
2. Construction method
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory ThreadFactory, RejectedExecutionHandler handler) {// Check basic type parametersif(corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // null pointer checkif(workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; KeepAliveTime = unit.tonanos (keepAliveTime); keepAliveTime = unit.tonanos (keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Copy the code
3. Submit the process of executing the task
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // If the thread pool is not in the RUNNING state, the shutdown command has been run, and you need to run reject() on the new task. // The reason for recheck here is that the state of the thread pool may change before and after the task is queued. if (! isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) addWorker(null, false); } // If the thread pool is not running or the task fails to queue, try creating worker to execute the task. // Here are three things to note: The second parameter of addWorker indicates whether to create a core thread. // 3. If addWorker returns false, the task has failed. Reject (reject) else if (! addWorker(command, false)) reject(command); }Copy the code
4. Addworker source code parsing
Private Boolean addWorker(Runnable firstTask, Boolean core) {retry: // Outer spinfor(;;) { int c = ctl.get(); int rs = runStateOf(c); / / this condition to write more difficult, I have been adjusted to its, and the following conditions equivalent / / (rs > SHUTDOWN) | | / / (rs = = SHUTDOWN && firstTask! = null) || // (rs == SHUTDOWN && workQueue.isEmpty()) // 1. If the thread pool status is greater than SHUTDOWN, returnfalse// 2. The thread pool status is SHUTDOWN and firstTask is not nullfalse// 3. The thread pool is SHUTDOWN and the queue is emptyfalse // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // Inner spinfor(;;) { int wc = workerCountOf(c); // The number of workers exceeds the capacityfalse if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // Add the number of workers using CAS. // If the increment succeeds, jump out of the outer loop and enter the second partif (compareAndIncrementWorkerCount(c)) breakretry; c = ctl.get(); // re-read CTL // The state of the thread pool changes, and the outer layer loop spinsif(runStateOf(c) ! = rs)continueretry; // In other cases, the inner loop can be rotated directlyelse CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if(t ! = null) { final ReentrantLock mainLock = this.mainLock; Mainlock. lock(); try { // Recheckwhile holding lock. // Back out on ThreadFactory failure or ifInt rs = runStateOf(ctl.get()); int rs = runStateOf(ctl.get());if(rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {/ / worker had already call start () method, which is no longer create the workerif(t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); Worker. add(w); worker. add(w); // Update the largestPoolSize variable int s = worker.size ();if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // Start the worker threadif (workerAdded) { t.start(); workerStarted = true; }}} finally {// the worker thread fails to start, indicating that the state of the thread pool has changed (the shutdown operation has been performed) and the shutdown operation needs to be performedif (! workerStarted) addWorkerFailed(w); } returnworkerStarted; }Copy the code
5. Thread pool Worker task unit
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // Here is the key to Worker, using the thread factory to create a thread. Worker this.thread = getThreadFactory().newthread (this); } /** Delegates main run loop to outer runWorker */ public voidrun() { runWorker(this); } // omit code... }Copy the code
6. The core thread executes logic — runworker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Unlock () is called so that the outside can interrupt w.unlock(); // Allow interrupts // This variable is used to determine whether spin (whileLoop) Boolean completedAbruptly =true; 1. If firstTask is not null, execute firstTask; If firstTask is null, getTask() is called to get the task from the queue. // 3. The property of a blocking queue is that the current thread will be blocked when the queue is emptywhile(task ! = null || (task = getTask()) ! = null) {// The worker is locked for the following purposes: Reduce lock range and improve performance // 2. Ensure that each worker performs a serial w.lock(); // If pool is stopping, ensure thread is interrupted; //if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race whileClearing Interrupt // Interrupts the current thread if the thread pool is stoppingif((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); // Executes the task and extends its functionality before and after execution with 'beforeExecute()' and 'afterExecute()'. // These two methods are empty implementations in the current class. try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); }} finally {// help GC task = null; W.com pletedTasks++; w.unlock(); } } completedAbruptly =false; ProcessWorkerExit (w, completedAbruptly); processWorkerExit(w, completedAbruptly); }}Copy the code
7. Pay attention to self-built thread pool
-
Number of blocked task queues
-
The name of the thread pool, preferably business related
-
The size of the core thread pool depends on the business situation. What is the appropriate number of threads?
-
Maximum thread pool size, depending on business situation. What is the appropriate number of threads?
-
Rejection policy, I personally generally log, if the main business I will make compensation according to the log.
Such as:
ThreadPoolExecutor executor = new ThreadPoolExecutor(Number of CPU cores + 1, 2 * Number of CPU cores + 1, 5, timeUnit. SECONDS, New ArrayBlockingQueue<>(2000), // Pay -account new DefaultThreadFactory("pay-account"), (r1, executor) -> {// recordlogRequeue to compensate});Copy the code