“This is the 11th day of my participation in the Gwen Challenge in November. See details: The Last Gwen Challenge in 2021”

Thread pool is to avoid frequent creation and destruction of thread performance consumption, and the establishment of a pooling technology, it is created threads into the “pool”, when there is a task to reuse the existing threads, without waiting for the creation process, so that can effectively improve the response speed of the program. But if you’re talking about thread pools, you’re talking about ThreadPoolExecutor,

* If the thread pool object returns by Executors, it has the following disadvantages:

  • 1) FixedThreadPool and SingleThreadPool: the allowed queue length is integer. MAX_VALUE, which may accumulate a large number Of requests, resulting in OOM(Out Of Memory).
  • 2) CachedThreadPool and ScheduledThreadPool: the number of threads allowed to create is integer. MAX_VALUE, which may create a large number of threads, resulting in OOM.

In fact, when we go to see **Executors source will find, Executors. NewFixedThreadPool () * *, Executors. NewSingleThreadExecutor () and Executors. NewCachedThreadPool () method is through the bottom of the ThreadPoolExecutor.

process

ThreadPoolExecutor’s core arguments refer to the arguments it needs to pass when it is built, and are constructed as follows:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            //maximumPoolSize must be greater than 0 and must be greater than corePoolSize
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code
  • The first parameter: **corePoolSize** represents the number of resident core threads in the thread pool. If set to 0, the thread pool is destroyed when there are no tasks. If greater than 0, the number of threads in the thread pool is guaranteed to equal this value even when there are no tasks. Note, however, that if this value is set to a small value, threads will be created and destroyed frequently. If set to a large value, it will waste system resources, so developers need to adjust this value for their actual business.
  • The second parameter: maximumPoolSize indicates the maximum number of threads that can be created in the thread pool at the maximum number of tasks. Officially, this value must be greater than 0 and greater than or equal to corePoolSize. This value is used only when there are many tasks and they cannot be stored in a task queue.
  • Parameter 3: KeepAliveTime specifies the thread lifetime. When the thread pool is idle and exceeds this time, additional threads are destroyed until the number of threads in the pool is destroyed equal to corePoolSize. If maximumPoolSize is equal to corePoolSize, The thread pool will not destroy any threads when idle.
  • Parameter 4:unitA unit of survival time that is compatible with **keepAliveTime** parameters are used together.
  • Parameter 5: **workQueue** represents the task queue executed by the thread pool. When all threads in the thread pool are working on tasks, new tasks will be cached in this task queue and queued for execution.
  • Parameter 6: **threadFactory** indicates the thread creation factory. This parameter is rarely used. We do not specify this parameter when creating a thread pool.
  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
​
public static ThreadFactory defaultThreadFactory(a) {
        return new DefaultThreadFactory();
    }
​
// The default thread creation factory needs to implement the ThreadFactory interface
static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix ="pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
​
        // Create a thread
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                // Create a non-daemon thread
                t.setDaemon(false);
            if(t.getPriority() ! = Thread.NORM_PRIORITY)// Set thread priority to default
                t.setPriority(Thread.NORM_PRIORITY);
            returnt; }}Copy the code

We can also customize a ThreadFactory by implementing the **ThreadFactory** interface, which allows us to customize the name of the thread or the priority of thread execution.

  • Parameter 7: RejectedExecutionHandler RejectedExecutionHandler Specifies the rejection policy of the thread pool. This policy is used when the thread pool is already full in the cache queue ** and no new thread can be created to execute the task. It is a traffic limiting mechanism. Execute ()** = execute()**;

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            If there are fewer corePoolSize threads running, try * to start a new thread * task with the given command as the first thread. Calls to addWorker automatically check runState and * workerCount, thus preventing false alarms from increasing * by returning false threads that should not execute. * * 2. If the task can be queued successfully, then we still need to * double check whether we should add threads * (because the existing one has died since the last check) or * the pool has been closed since entering this method. So we * recheck the state and back up the queue if necessary * stop, and if not, start a new thread. * * 3. If we can't queue the task, we try adding a new * thread. If it fails, we know we are closed or saturated * and therefore reject the task. * /
            int c = ctl.get();
            // The number of working threads is smaller than the number of core threads
            if (workerCountOf(c) < corePoolSize) {
                // Create a new thread to execute the task
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // Check if the thread pool is runnable, and if so, add tasks to the queue
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // Check whether the thread pool is runnable again to prevent the thread pool from closing after the first validation
                // If the task is not running, the newly queued task is removed
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // If the number of threads in the thread pool is 0 (this happens when corePoolSize is set to 0)
                else if (workerCountOf(recheck) == 0)
                    // Create a new thread to execute the task
                    addWorker(null.false);
            }
            // The core threads are busy and the queues are full. The attempt to start a new thread failed
            else if(! addWorker(command,false))
                // Execute the reject policy
                reject(command);
        }
    Copy the code

    **addWorker(Runnable firstTask, Boolean core)

    • FirstTask: the firstTask to be executed by the thread, or null if none
    • core: Specifies the threshold (maximum) for determining whether threads can be created. If true, ** is usedcorePoolSizeAs a threshold, false indicates usemaximumPoolSize** is used as the threshold

extension

The execute () and submit ()

Execute () and submit() are used to execute threading tasks. The main difference between them is that submit() accepts the return value from the thread pool, while execute() does not.

Specific use of two methods:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor execute = new ThreadPoolExecutor(2.10.10L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(20));
        / / use the execute
        execute.execute(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("Hello,execute"); }});/ / use the submit
        Future<String> future = execute.submit(new Callable<String>() {
            @Override
            public String call(a) throws Exception {
                System.out.println("Hello,submit");
                return "Success"; }}); System.out.println(future.get()); }Copy the code

Program execution result:

Hello,execute
Hello,submit
Success
Copy the code

From the above results, you can see that the submit() method can be used in conjunction with Futrue to receive the return value of thread execution. Another difference is that the execute() method is an Executor interface method, while the Submit () method is an ExecutorService interface method

  • Thread pool rejection policy

When the task queue in the thread pool is full, the system checks whether the number of threads in the current thread pool is greater than or equal to the maximum number of threads in the thread pool. If yes, the system triggers the reject policy of the thread pool.

Java has four built-in rejection policies:

  • AbortPolicyThe thread pool throws an exception and terminates execution. It is the default reject policy.
  • CallerRunsPolicy, and gives the task to the current thread to execute
  • DiscardPolicy, ignore this task (latest task)
  • DiscardOldestPolicy, ignore the earliest task (the first task to join the queue)

AbortPolicy An example of a rejection policy is as follows:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1.3.10,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy());
        for (int i=0; i<6; i++){ executor.execute(()->{ System.out.println(Thread.currentThread().getName()); }); }Copy the code

Program running results:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task example.ThreadPoolExecutorExample$$Lambda$1/1149319664@4dd8dc3 rejected from java.util.concurrent.ThreadPoolExecutor@6d03e736[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at example.ThreadPoolExecutorExample.main(ThreadPoolExecutorExample.java:34)
Copy the code

As you can see, when the sixth task arrives, the thread pool implements AbortPolicy rejection and throws an exception. Because the queue stores a maximum of two tasks and can create a maximum of three threads to execute the task (2+3=5), the thread pool becomes “busy” when the sixth task arrives.

User-defined rejection policy

To customize the rejection policy, simply create a new RejectedExecutionHandler object and override its rejectedExecution() method, as shown below:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1.3.10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
                // Add a user-defined denial policy
                new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // Business execution method
                System.out.println("Execute custom reject policy"); }});for (int i=0; i<6; i++){ executor.execute(()->{ System.out.println(Thread.currentThread().getName()); }); }Copy the code

Program execution result:

Run the user-defined denial policy pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-2 pool-1-thread-3Copy the code

As you can see that the thread pool is executing a custom rejection policy, you can add your own business processing code to rejectedExecution.

ThreadPoolExecutorextension

ThreadPoolExecutor is extended primarily by overriding its beforeExecute() and afterExecute() methods. We can add logging to the extension method or implement statistics such as thread execution times, as shown in the following code:

public class ThreadPoolExtend {
    public static void main(String[] args) {
        // Thread pool extension call
        MyThreadPoolExtend executor = new MyThreadPoolExtend(2.4.10, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
        for (int i=0; i<3;i++){
            executor.execute(()->{
                System.out.println(Thread.currentThread().getName());
            });
        }
    }
​
    /** * thread pool extension */
    static class MyThreadPoolExtend extends ThreadPoolExecutor{
        // Save the time when the thread starts executing
        private final ThreadLocal<Long> localTime = new ThreadLocal<>();
        public MyThreadPoolExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
​
        /** ** before execution begins@paramT thread *@paramR * /
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            // Start time (in nanoseconds)
            Long sTime = System.nanoTime();
            localTime.set(sTime);
            System.out.println(String.format("%s|before|time=%s",t.getName(),sTime));
            super.beforeExecute(t, r);
        }
​
        /** ** after the execution is complete@paramR task *@paramT throws legacy */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            // End time (unit: nanoseconds)
            long eTime = System.nanoTime();
            long totalTime = eTime - localTime.get();
            System.out.println(String.format("% s | after | | take time = % s: % s milliseconds." ",Thread.currentThread().getName(),eTime,totalTime/1000000.0));
            super.afterExecute(r, t); }}}Copy the code

Program execution result:

pool-1-thread-1|before|time=95659085427600 pool-1-thread-1 pool-1-thread-2|before|time=95659085423300 pool-1-thread-2 - the thread pool - 1-2 | after | | take time = 95659113130200:27.7069 milliseconds - thread pool - 1-1 | after | | time = 95659112193700 time: 26.7661 milliseconds - the thread pool - 1-2 | before | time = 95659117635200 - thread pool - 1-2 - thread pool - 1-2 | after | | time = 95659117822000 time: 0.1868 millisecondsCopy the code

summary

A thread pool must be created using ThreadPoolExecutor, so that the running rules of the thread pool are more clear and the risk of resource depletion is avoided. If the number of threads in the thread pool has reached the maximum number of threads, the number of threads in the thread pool will be rejected. There are four automatic Rejection policies in Java. Users can also customize the rejection policy by overriding rejectedExecution(), and can extend the functionality of ThreadPoolExecutor by overriding beforeExecute() and afterExecute().