In the previous post on Why Ali Advises You Not to Use Executors to Create a Thread Pool? This article has taken a closer look at thread pools, and here are the main flows:

  1. Submit a task, and if the number of worker threads in the thread pool is smaller than corePoolSize, create a new worker thread to execute the task

  2. If the current worker thread in the thread pool is equal to corePoolSize, the new task is put into the work queue and is being executed

  3. If the work queue is full and the number of worker threads is smaller than maximumPoolSize, a new worker thread is created to execute the task

  4. If the number of worker threads in the current thread pool reaches maximumPoolSize and a new task cannot be added to the task queue, the corresponding policy is used to process the task (default is reject policy).

  5. When the number of threads is greater than the number of core threads, the thread waits for keepAliveTime and still has no tasks to handle, the thread shrinks to the number of core threads. (Pass true to the allowCoreThreadTimeOut method to allow the pool to reclaim the core thread if it is idle.)

One of the built-in policies in the JDK is the CallerRunsPolicy policy, which is easy to overlook and is executed by the thread calling the thread pool when the task queue is full. For example, if we call the thread pool in the Tomcat thread, when the queue of the thread pool is full, the Task will be handed off to the Tomcat thread. This can affect other threads executing synchronously and may even crash the thread pool.

There are also ways to modify the default behavior of the thread pool, such as recycling core threads. Or call the prestartAllCoreThreads method immediately after declaring the thread pool to start all core threads.

We found that the number of core threads will be increased only when the work queue is full. Can we expand the number of core threads first and wait until the maximum number of threads is reached before joining the work queue? What about making thread pools more flexible and enabling more threads first?

Of course, tomcat’s ThreadPoolExecutor does just that.

Tomcat’s thread pool starts all core threads (prestartAllCoreThreads) when it is created, and increases the number of threads first.

In the tomcat ThreadPoolExecutor inherited from Java. Util. Concurrent. ThreadPoolExecutor, and task queue use is TaskQueue (inherited from LinkedBlockingQueue < Runnable >)



public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {



public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 

  TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
 
{

      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

      // Call the method of the parent class, starting all the core threads first

      prestartAllCoreThreads();

    }

  public void execute(Runnable command) {

    if (command == null)

      throw new NullPointerException();



    int c = ctl.get();

    // 1. If the number of worker threads is smaller than the number of core threads (corePoolSize), create a worker thread to execute the task

    if (workerCountOf(c) < corePoolSize) {

      if (addWorker(command, true))

          return;

      c = ctl.get();

    }



    // 2. If the status is running and tasks can be added to the task queue

    if (isRunning(c) && workQueue.offer(command)) {

      int recheck = ctl.get();



      // if the state is not running (the user may have called shutdown),

      // Remove the task just added to the task queue

      if (! isRunning(recheck) && remove(command))

          reject(command);

       // 2.2 If there is no worker thread,

       // create a new worker thread to execute the task (the task has been added to the task queue)

      else if (workerCountOf(recheck) == 0)

          addWorker(null.false);

    }



    // 3. If the queue is full, a new worker thread is started to execute the task

    else if(! addWorker(command,false))

      reject(command);

  }



}





public class TaskQueue extends LinkedBlockingQueue<Runnable{



  // Set the thread pool with the setParent method

  private volatile ThreadPoolExecutor parent = null;



  @Override

  public boolean offer(Runnable o) {

    

    if (parent==nullreturn super.offer(o);



    // 1. The number of threads has reached the upper limit, and the threads are added to the queue normally

    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);



    // 2. There are idle threads joining the queue

    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);



    // 3. If the number of core threads is less than the maximum number of threads, a new worker thread is created instead of being queued

    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;

    

    // Join the queue

    return super.offer(o);

  }

}

Copy the code

For example, if the number of core threads is set to 1, the maximum number of threads is set to 2, the execution time of each task is 5 seconds, when the first task is submitted,submittedCount=1, a worker thread is created to execute the task, and poolSize becomes 1(see the first step of the execute method).

Now the second task is committed and submittedCount has a value of 2. Does not meet the first and second criteria of the offer method, but meets the third criteria, returns false, indicating that joining the queue failed (indicating that the queue is full)

At this point, we return to the third condition of execute, and directly start a new worker thread to execute the task.

This gives priority to scaling up to the maximum number of threads. Unnecessary tasks that cannot be processed are put into the queue.

Focus on me not getting lost

I am me, different fireworks