This is the 15th day of my participation in Gwen Challenge


Thread pool is to avoid frequent creation and destruction of thread performance consumption, and the establishment of a pooling technology, it is created threads into the “pool”, when there is a task to reuse the existing threads, without waiting for the creation process, so that can effectively improve the response speed of the program. But you can’t talk about thread pools without talking about ThreadPoolExecutor, which is defined in Alibaba’s Java Development Manual as follows:

Thread pools can be created by using Executors instead of using ThreadPoolExecutor. By doing so, you can avoid resource depletion by following the thread pool operating rules.

* If the thread pool object returns by Executors, it has the following disadvantages:

  • FixedThreadPool and SingleThreadPool: The allowed request queue length is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM.

  • CachedThreadPool and ScheduledThreadPool: The number of threads allowed to be created is integer. MAX_VALUE, which may create a large number of threads, resulting in OOM.

Actually, when we go to see the source of Executors, Executors. NewFixedThreadPool () and Executors. NewSingleThreadExecutor () and Executors. NewCachedThreadPool () method is through the bottom of the ThreadPoolExecutor implementation,

So this article focuses on learning more about ThreadPoolExecutor: what are its core parameters? How does it work?

Seven core parameters

ThreadPoolExecutor’s core arguments refer to the arguments it needs to pass when it is built, and are constructed as follows:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        // maximumPoolSize must be greater than 0 and must be greater than corePoolSize
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code
  • The first argument: corePoolSize indicates the number of resident core threads in the thread pool. If set to 0, the thread pool is destroyed when there are no tasks. If greater than 0, the number of threads in the thread pool is guaranteed to equal this value even when there are no tasks. Note, however, that if this value is set to a small value, threads will be created and destroyed frequently. If set to a large value, it will waste system resources, so developers need to adjust this value for their actual business.

  • The second parameter: maximumPoolSize indicates the maximum number of threads that can be created in the thread pool at the maximum number of tasks. Officially, this value must be greater than 0 and greater than or equal to corePoolSize. This value is used only when there are many tasks and they cannot be stored in a task queue.

  • Parameter 3: KeepAliveTime specifies the thread lifetime. When the thread pool is idle and exceeds this time, additional threads are destroyed until the number of threads in the pool is destroyed equal to corePoolSize. If maximumPoolSize is equal to corePoolSize, The thread pool will not destroy any threads when idle.

  • The fourth parameter, unit, represents the unit of the keepAliveTime, which is used with the keepAliveTime parameter.

  • Parameter 5: workQueue Indicates the queue of tasks executed by the thread pool. When all threads in the thread pool are working on tasks, new tasks will be cached in this queue and queued for execution.

  • Parameter 6: threadFactory Specifies the creation factory of the thread. This parameter is rarely used. We do not specify this parameter when creating the thread pool

Source code is as follows:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    / / Executors. DefaultThreadFactory () for the default thread creation factory
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public static ThreadFactory defaultThreadFactory(a) {
    return new DefaultThreadFactory();
}
// The default thread creation factory needs to implement the ThreadFactory interface
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix ="pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    // Create a thread
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon()) 
            t.setDaemon(false); // Create a non-daemon thread
        if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);// Set thread priority to default
        returnt; }}Copy the code

We can also customize a ThreadFactory by implementing the ThreadFactory interface, which allows us to customize the name of a thread or the priority of thread execution.

  • Parameter 7: RejectedExecutionHandler Specifies the rejection policy of the thread pool. This rejection policy is used when the task pool is already full in the cache workQueue and no new thread can be created to execute the task. It is a traffic limiting mechanism.

The thread pool workflow starts with its execute() method.

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // The number of working threads is smaller than the number of core threads
    if (workerCountOf(c) < corePoolSize) {
        // Create a new thread to perform this task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // Check if the thread pool is running, and if so, add tasks to the queue
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // Check whether the thread pool is running again to prevent the thread pool from closing after the first validation
        // If the status is not running, the newly queued task is removed
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // If the number of threads in the thread pool is 0 (this happens when corePoolSize is set to 0)
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false); // Create a new thread to execute the task
    }
    // The core threads are busy and the queues are full. The attempt to start a new thread failed
    else if(! addWorker(command,false)) 
        // Execute the reject policy
        reject(command);
}
Copy the code

The addWorker(Runnable firstTask, Boolean core) method has the following parameters:

FirstTask, the task that the thread should run first, or null if none is available;

Core, determines whether a threshold (maximum) can be created for the thread, if true means corePoolSize is used as the threshold, false means maximumPoolSize is used as the threshold.

The test analysis

This is an interview question that tests your knowledge of thread pools and ThreadPoolExecutor. It is also a basic knowledge of Java.

Here are a few more interview questions related to ThreadPoolExecutor:

How many ways can ThreadPoolExecutor be executed? What’s the difference?

  • What is a thread rejection policy?

  • What are the categories of rejection strategies?

  • How do I customize a rejection policy?

  • Can ThreadPoolExecutor be extended? How to implement the extension?

Knowledge extension

execute() VS submit()

Execute () and submit() are used to perform thread-pool tasks. The main difference between them is that the submit() method can accept the return value of thread-pool execution, while execute() cannot.

Let’s see two methods in action:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2.10.10L,
        TimeUnit.SECONDS, new LinkedBlockingQueue(20));
/ / use the execute
executor.execute(new Runnable() {
    @Override
    public void run(a) {
        System.out.println("Hello, execute."); }});/ / use the submit
Future<String> future = executor.submit(new Callable<String>() {
    @Override
    public String call(a) throws Exception {
        System.out.println("Hello, submit.");
        return "Success"; }}); System.out.println(future.get());Copy the code

The execution results of the above procedures are as follows:

Hello, submit.
Hello, execute.
Success
Copy the code

From the above results, you can see that the submit() method can be used in conjunction with Futrue to receive the return value of thread execution. Another difference is that the execute() method is an Executor interface method, while the submit() method is an ExecutorService interface method. Their inheritance is as follows:

Thread pool rejection policy

When the task queue in the thread pool is full, the system checks whether the number of threads in the current thread pool is greater than or equal to the maximum number of threads in the thread pool. If yes, the system triggers the reject policy of the thread pool.

Java has four built-in rejection policies:

  • AbortPolicy, termination policy, the thread pool throws an exception and terminates execution. It is the default rejection policy.

  • CallerRunsPolicy, which assigns the task to the current thread to execute;

  • DiscardPolicy, ignore this task (the latest task);

  • DiscardOldestPolicy: Ignores the earliest task (the first one to be queued).

For example, let’s demonstrate the rejection policy of an AbortPolicy as follows:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1.3.10,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new ThreadPoolExecutor.AbortPolicy()); // Add AbortPolicy to reject the policy
for (int i = 0; i < 6; i++) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName());
    });
}
Copy the code

The execution results of the above procedures:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lagou.interview.ThreadPoolExample$$Lambda$1/1096979270@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
 at com.lagou.interview.ThreadPoolExample.rejected(ThreadPoolExample.java:35)
 at com.lagou.interview.ThreadPoolExample.main(ThreadPoolExample.java:26)
Copy the code

As you can see, when the sixth task arrives, the thread pool implements AbortPolicy rejection and throws an exception. Because the queue stores a maximum of two tasks and can create a maximum of three threads to execute the task (2+3=5), the thread pool becomes “busy” when the sixth task arrives.

User-defined rejection policy

To customize the rejection policy, simply create a new RejectedExecutionHandler object and override its rejectedExecution() method, as shown below:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1.3.10,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new RejectedExecutionHandler() {  // Add a user-defined denial policy
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // Business processing method
                System.out.println("Execute custom reject policy"); }});for (int i = 0; i < 6; i++) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName());
    });
}
Copy the code

The result of this code execution is as follows:

Example Run the user-defined pool- denial policy1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
Copy the code

As you can see that the thread pool is executing a custom rejection policy, you can add your own business processing code to rejectedExecution.

ThreadPoolExecutor extension ThreadPoolExecutor is extended primarily by overriding its beforeExecute() and afterExecute() methods. We can add logging or implement statistics in the extension method, For example, count the execution time of a thread as follows:

public class ThreadPoolExtend {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Thread pool extension call
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(2.4.10,
                TimeUnit.SECONDS, new LinkedBlockingQueue());
        for (int i = 0; i < 3; i++) { executor.execute(() -> { Thread.currentThread().getName(); }); }}/** * thread pool extension */
    static class MyThreadPoolExecutor extends ThreadPoolExecutor {
        // Save the thread execution start time
        private final ThreadLocal<Long> localTime = new ThreadLocal<>();
        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
        /** ** before execution begins@paramT thread *@paramR * /
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            Long sTime = System.nanoTime(); // Start time (in nanoseconds)
            localTime.set(sTime);
            System.out.println(String.format("%s | before | time=%s",
                    t.getName(), sTime));
            super.beforeExecute(t, r);
        }
        /** ** after the execution is complete@paramR task *@paramThe exception thrown by t */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            Long eTime = System.nanoTime(); // End time (unit: nanoseconds)
            Long totalTime = eTime - localTime.get(); // Total execution time
            System.out.println(String.format("% s | after | | take time = % s: % s milliseconds." ",
                    Thread.currentThread().getName(), eTime, (totalTime / 1000000.0)));
            super.afterExecute(r, t); }}}Copy the code

The execution result of the above program is as follows:

pool-1-thread-1 | before | time=4570298843700
pool-1-thread-2 | before | time=4570298840000
pool-1-thread-1 | after | time=4570327059500| time:28.2158Ms pool -1-thread-2 | after | time=4570327138100| time:28.2981Ms pool -1-thread-1 | before | time=4570328467800
pool-1-thread-1 | after | time=4570328636800| time:0.169msCopy the code
summary

In conclusion, thread pools must be created using ThreadPoolExecutor, so that the rules for running thread pools are clear and the risk of running out of resources is avoided. If the number of threads in the thread pool has reached the maximum number of threads, the number of threads in the thread pool will be rejected. There are four automatic Rejection policies in Java. Users can also customize the rejection policy by overriding rejectedExecution(), and we can extend ThreadPoolExecutor by overriding beforeExecute() and afterExecute().