Overview of Thread pools

  • Pooling techniques, including thread pooling and database connection pooling, constant pooling, and so on, are probably the most commonly used techniques to improve program performance

  • Creation and destruction of the thread is time-consuming, high concurrent Java programs for processing, thus introducing the thread pool, which is to maintain a set of threads are available, and if there is a task, as soon as the thread pool threads are assigned to tasks, improve performance, if all the threads in the thread pool is busy, can put the task in task queue, Or create a new thread and put it into the thread pool to handle the new task

  • The benefits of using thread pools

    • Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.

    • In the chapter “Concurrent processing” of Alibaba Java Development Manual, it is clearly pointed out that thread resources must be provided through the thread pool, and it is not allowed to display self-created threads in the application.

      Why is that?

      The benefit of using thread pools is to reduce the time spent creating and destroying threads, as well as the overhead of system resources, and solve the problem of insufficient resources. If you don’t use thread pools, you can run out of memory or “overswitch” by creating a large number of similar threads.

    • Improve response speed. When a task arrives, it can execute immediately without waiting for a thread to be created.

    • Improve thread manageability. Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also reduce system stability. Thread pools can be used for unified allocation, tuning, and monitoring. (The original crude server implementation was to bind a socket request and then open a new thread to handle it. If the requests were heavy, the server would crash due to lack of management of thread resources.)

      • Thread pool monitoring methods:

        • Actuator component in SpringBoot

        • Get thread pool information through ThreadPoolExecutor’s own interface

A thread pool usage scenario in a real project

  • Thread pools are generally used to execute multiple unrelated time-consuming tasks. In the absence of multi-threading, tasks are executed sequentially. With thread pools, multiple unrelated tasks can be executed simultaneously.

  • Here’s an example of a project in action:

    • Timed thread pools used in Caches in DockerHub projects to periodically update cached data to the database (performing asynchronous tasks)
    • The rest are mostly similar
  • General rules to pay attention to when using

    • Use thread pools instead of creating individual threads

    • Use the ThreadPoolExecutor constructor instead of the Executors tool class, as explained below

    • The thread pool name is explicitly defined to distinguish between service names for fault locating

      • You can use a custom ThreadFactory

        import java.util.concurrent.Executors;
        import java.util.concurrent.ThreadFactory;
        import java.util.concurrent.atomic.AtomicInteger;
        /** * thread factory, which sets the thread name and helps us locate problems. * /
        public final class NamingThreadFactory implements ThreadFactory {
        
            private final AtomicInteger threadNum = new AtomicInteger();
            private final ThreadFactory delegate;
            private final String name;
        
            /** * Create a thread pool production factory with a name */
            public NamingThreadFactory(ThreadFactory delegate, String name) {
                this.delegate = delegate;
                this.name = name; // TODO consider uniquifying this
            }
        
            @Override 
            public Thread newThread(Runnable r) {
                Thread t = delegate.newThread(r);
                t.setName(name + "[#" + threadNum.incrementAndGet() + "]");
                returnt; }}Copy the code
      • Use Guava’s ThreadFactoryBuilder

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                                .setNameFormat(threadNamePrefix + "-%d")
                                .setDaemon(true).build();
        ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
        Copy the code
    • Different businesses use different thread pools

      • It is recommended that different services use different thread pools. Configure the current thread pool based on the current service conditions. Because different services have different concurrency and resource usage, optimize services related to system performance bottlenecks
    • Dependent tasks in using the same thread pool in A slightly higher concurrent condition could be A logical deadlock, generally speaking is to call the parent task A subtask B, father tasks and subtasks Shared A thread pool, when the parent task occupied all the core thread resources, and subtasks still not executed, cannot exit of core thread take up, At the same time, subtasks can only pile up in the task queue and cannot obtain thread resources. If the unbounded queue is used, the subtasks will pile up until OOM, and the specific reference thread pool is used improperly

Inheritance and implementation relationships of thread pool classes

Executor framework

  • The Executor framework was introduced after Java5. After Java5, starting a Thread through Executor is better than using the Thread start method. Helps avoid the This escape problem.

    Note: This escape means that another thread holds a reference to the object before the constructor returns. Calling methods that have not yet constructed objects can cause confusing errors that should be addressed by volatile. How does the advent of the Executor framework help solve this problem? — I’m not sure

    The Executor framework not only manages thread pools, but also provides thread factories, queues, and rejection policies. The Executor framework makes concurrent programming easier

  • In fact, in the Executor framework, there is a thread pool ForkJoinPool that is probably not used very much. This class inherits AbstractExecutorService, as described at the end of this article

  • In addition to the Executor framework, there is also the JUC framework, which is the general name for all the multithreaded related classes in the java.util.concurrent package

The Executor framework

Task submission

There are two ways to submit a task to a thread pool:

  1. The execute method

    1. We only acceptRunnableTask, do not provide a return value, source code analysis see below (Being a thread pool entry must be carefully analyzed)
  2. The submit method

    publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
      RunnableFuture<Void> ftask = newTaskFor(task, null);
      execute(ftask);
      return ftask;
    }
    
    / * * *@throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc} * /
    public <T> Future<T> submit(Runnable task, T result) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task, result);
      execute(ftask);
      return ftask;
    }
    
    / * * *@throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc} * /
    public <T> Future<T> submit(Callable<T> task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task);
      execute(ftask);
      return ftask;
    }
    
    
    Copy the code
    • ThreadPoolExecutor does not implement its own Submit method. Instead, it uses an implementation of its subclass AbstractExecutorService

    • Accepts a Runnable or Callable task and provides a Future type return value

      • submitInternally, incoming tasks are uniformly encapsulated asRunnableFutureType, which is implementedRunnablewithFutureInterface, old suture strange ~

      • The difference is that a Future passed to a Runnable task may not return a valid value, whereas a Callable task can return a result

        • When submitting a Runnable task, you can also specify a return result for the Future, but this result is obviously not the return value of the task’s completion, but a value passed in by the programmer, acting like a flag value

          public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
          }
          
          public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
              throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
          }
          
          static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
              this.task = task;
              this.result = result;
            }
            public T call(a) {
              task.run();
              returnresult; }}Copy the code
          • You can see it clearly for incomingRunnableThe task will be converted toCallableType, if an expected return value is passed in,callThe function returns nothing, but if nothing is passed in, null is returned
    • submitThe interior is actually still calledexecutemethods

    • The difference between Callable and Runnable is added here:

      • The former can have an internal return value and can throw an exception if a valid return value is not available, while the latter cannot throw an exception without a return value
    • Complement the Future interface

      • Can be achieved byisDoneCheck whether the task is complete
      • throughGet methodThe execution result
        1. Note that this method is a blocking method and must wait for the task to complete before returning

Destroy (close) the thread pool

  1. The shutdown method closes the thread pool and its state changes to shutdown. The thread pool is no longer accepting new tasks, but the tasks in the queue must complete.

    1. After the shutdown method is executed, the awaitTermination method is executed, which waits for the specified time for the thread pool to close, or returns true if it closes within the specified time, false otherwise

    2. Shutdown source code analysis

      public void shutdown(a) {
              final ReentrantLock mainLock = this.mainLock;
           / / lock
              mainLock.lock();
              try {
                  // determine if the caller has permission to shutdown the thread pool
                  checkShutdownAccess();
                  // CAS sets the thread pool state to SHUTDOWN
                  advanceRunState(SHUTDOWN);
                  // Interrupts all idle threads
                  interruptIdleWorkers();
                  // The hook function
                  onShutdown(); // hook for ScheduledThreadPoolExecutor
              } finally {
                  / / unlock
                  mainLock.unlock();
              }
              // Try to terminate the thread pool
              tryTerminate();
          }
      Copy the code
      • Parsing of the interruptIdleWorkers function is followed by parsing of tryTerminate
  2. The shutdownNow method closes the thread pool and the thread’s state changes to STOP. The thread pool terminates currently running tasks, stops processing queued tasks and returns a list of tasks waiting to be executed.

    public List<Runnable> shutdownNow(a) {
      List<Runnable> tasks;
      final ReentrantLock mainLock = this.mainLock;
      / / lock
      mainLock.lock();
      try {
        // determine if the caller has permission to shutdown the thread pool
        checkShutdownAccess();
        // CAS sets the thread pool state to STOP
        advanceRunState(STOP);
        // Interrupt all threads
        interruptWorkers();
        // Get a list of the remaining unexecuted work from the queue
        tasks = drainQueue();
      } finally {
        mainLock.unlock();
      }
      // Try to terminate the thread pool
      tryTerminate();
      // Returns a list of unexecuted tasks
      return tasks;
    }
    Copy the code
    • interruptWorkersThe analysis of the following article
  3. Use the following two methods to determine whether the thread pool is completely closed

    1. isTerminated() when callingshutdown()Method, and all submitted tasks return true when completed, or executeshutdownNowAfter, all threads in the thread pool are interrupted. Return true if the number of worker threads is 0
    2. isShutdown()When callingshutdown()Method returns true.

Executor framework usage diagram

  1. The main thread first creates the implementationRunnableorCallableThe task object of the interface.
  2. Create the completed implementationRunnable/CallableThe object of the interface is handed directly toexecuteperform: ExecutorService. Execute (Runnable Command)(You can call meRunnableObject orCallableObject submitted tosubmitExecution (Executorservice. submit (Runnable task)orExecutorservice. submit (Callable <T> task)).
  3. If you executeThe ExecutorService. Submit (...)..ExecutorServiceAn implementation is returnedFutureObject of interface(Enforcement was also mentioned earlierexecute()Methods andsubmit()Differences in methods,submit()Will return aFutureTaskobject
  4. Finally, the main thread can executeFutureTask.get()Method to wait for the task to complete. The main thread can also executeFuturetask. cancel (Boolean mayInterruptIfRunning)To cancel the execution of this task.

Use of thread pools

Create a thread pool by using the Executors tool class

  • The most convenient way to create a thread pool is to useExecutorsUtility classes can create normal thread pools and thread pools that can perform scheduled tasks, but the simple creation method means a high degree of encapsulation, resulting in low freedom and even some risk

A normal thread pool

  • A thread pool with a fixed number of threads

    • The number of threads in this thread pool is always the same. When a new task is submitted, it is executed immediately if there are idle threads in the thread pool. If no, the new task is temporarily stored in a task queue. When a thread is idle, the task in the task queue will be processed.

      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
      }
      // The default task queue length is integer.max_value
      public LinkedBlockingQueue(a) {
        this(Integer.MAX_VALUE);
      }
      Copy the code
      • The number of core threads is the same as the maximum number of threads, which means that the core threads are all available
      • The use of unbounded queues does not reject tasks, but this has its pitfalls.
      • Since we use unbounded queues,maximumPoolSizeWill be a de facto invalid parameter, because there is no possibility that the task queue will be full (the task queue can be regarded as the largest in the system, so there is no need to set the maximum number of threads, because no amount of tasks can be cached in the queue). So, by creatingFixedThreadPoolThe source code can be seen createdFixedThreadPoolcorePoolSizemaximumPoolSizeIs set to the same value.
        • For the same reason, when you use an unbounded queuekeepAliveTimeWill be an invalid argument (Because there are no threads outside the core threadOf course, this is useful if the idle core thread is allowed to recycle out of time, i.e. recycle immediately if it is idle.
      • When the number of threads in the thread pool reachescorePoolSizeAfter, the new task will wait in an unbounded queue, so the number of threads in the thread pool will not exceed corePoolSize;So if corePoolSize is not set correctly, there will be a lot of tasks waiting and not full performance
      • The number of threads allowed to be created is limited, but the queue length of allowed requests is integer. MAX_VALUE
  • A thread pool with only one thread

    • Can be considered as a special case of a fixed number of threads pool, i.e., nThreads is 1

      public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        // Use the packing class to ensure that:
        return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory));
      }
      Copy the code
  • Thread pool that dynamically allocates the number of threads

    • This method returns a thread pool that can adjust the number of threads as needed. The number of threads in the thread pool is uncertain, but if there are free threads that can be reused, reusable threads are preferred. If all threads are working and a new task is submitted, a new thread is created to process the task. All threads will return to the thread pool for reuse after completing the current task.

      public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>(),
                                            threadFactory);
      }
      Copy the code
      • The core thread count is set to 0, so that no idle people are kept, and the thread pool does not waste resources when there are no tasks
      • Since the core thread tree is 0, all threads in the thread pool are not core threads and will therefore be reclaimed within 60 seconds if they do not receive live threads
      • Dynamic allocation and on-demand creation are made possible by the SynchronousQueue type of task queueThis queue does not cache tasks, but if there is a free thread, it will be handed over to the free thread. If there is no free thread, it will create a new thread directly:
        • Execute first in the execute methodSynchronousQueue.offer(Runnable task)Submit the task to the task queue. If there are idle threads executing in the current thread poolSynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS), so the main thread performs the offer operation versus the idle threadpolloperationMatching success, the main thread hands off tasks to idle threads.execute()Method completes execution.
        • When the number of threads in the thread pool is 0 in the initial condition, or there are no free threads in the thread pool, no threads will executeSynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS).In this case, the offer method will return falseCachedThreadPoolA new thread is created to execute the task, and the execute method completes;
      • Although queues use bounded queues, the maximum number of threads isInteger.MAX_VALUEThis means that the thread pool can continue to accept tasks until the stack space is OOM

Pool of threads that perform scheduled tasks

Creation of a thread pool for scheduled tasks
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
  int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}


public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue(), threadFactory, handler);
}
Copy the code
  • Although queues use bounded queues, the maximum number of threads isInteger.MAX_VALUEThis means that the thread pool can continue to accept tasks until the stack space is OOM

  • Note that while ScheduledExecutorService is an internal constructor that calls the parent ThreadPoolExecutord class, its internal implementation’s core entry method is no longer the Execute method of ThreadPoolExecutor, but ratherThe delayExecute ScheduledThreadPoolExecutor method

  • The implementation of scheduled tasks relies on DelayedWorkQueue

  • Note that Scheduled tasks can be executed using @scheduled annotations in SpringBoot, or the underlying thread pool for Scheduled tasks. In fact, this thread pool is rarely used because there are other solutions for implementing Scheduled tasks, such as SpringBoot annotations and Quartz

    Note: Quartz is a task scheduling library written in Java and open-source by the OpenSymphony organization. Quartz is mostly used in actual project development and is recommended. Because Quartz is theoretically capable of scheduling tens of thousands of tasks simultaneously, it has a wealth of features, including task scheduling, task persistence, clustering, plug-ins, and more

Perform different kinds of scheduled tasks
  • The schedule method of a one-time delay task

    publicScheduledFuture<? > schedule(Runnable command,long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw newNullPointerException(); RunnableScheduledFuture<? > t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    Copy the code
    • It’s worth adding here,ScheduledThreadPoolExecutorRewrite theexecutewithsubmitMethod, both methods are actually simply called internallyscheduleMethod to implement
  • The scheduleAtFixedRate method is executed at a fixed interval based on the start of the last task

    publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    Copy the code
  • The scheduleWithFixedDelay method executes a task at a fixed interval based on the end of the last task

    publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(-delay));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    Copy the code
  • The difference between the latter two is shown in the figure

    • Two questions arise:

      • If the execution time of a task is longer than the interval time in fixed-rate mode, how is the task scheduled to be executed? The answer is: subsequent tasks are executed after the previous task has completed, regardless of the execution interval, i.e., delayed execution, rather than concurrent execution
      • If the previous task is abnormal when performing a periodic interval task, will the subsequent tasks continue to be executed? A: Once an exception occurs, the current task and subsequent tasks are stuck, and the custom afterExecute method can handle the exception to ensure that the task that threw the exception is canceled while other tasks continue to execute
An understanding of the general workings of a scheduled task thread pool
  • As above said, timing task thread pool at the core of the entrance is in three types of task method above is one way — delayedExecute, but before say this key entry method, shall not be said, a method is called before to submit the task of packaging, packaging it a design to the analogy is more, with a first class diagram is roughly grasp

  • First wrapped as ScheduledFutureTask

    // Used to wrap tasks submitted by schedule(Runnable)
    // Result is null and ns is the system time to trigger the task execution in nanoseconds
    ScheduledFutureTask(Runnable r, V result, long ns) {
      super(r, result);
      this.time = ns;
      this.period = 0;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // Wrap scheduleWithFixedDelay and scheduleAtFixedRate committed tasks
    / / the result is null
    // Ns is the system time to trigger the next task execution in nanoseconds
    // period Indicates the task cycle period in nanoseconds
    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
      super(r, result);
      this.time = ns;
      this.period = period;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // Package schedule(Callable) to submit tasks
    // Ns is the system time in nanoseconds to trigger the execution of the task
    ScheduledFutureTask(Callable<V> callable, long ns) {
      super(callable);
      this.time = ns;
      this.period = 0;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    
    // The key run method
    public void run(a) {
      // First determine whether the task is executed periodically
      boolean periodic = isPeriodic();
      // Determine whether the current thread pool can execute the scheduled task. If not, cancel the task
      if(! canRunInCurrentRunState(periodic)) cancel(false);
      else if(! periodic)// Execute the submitted task directly if it is not a periodic task, i.e. a one-time scheduled task
        ScheduledFutureTask.super.run();
      // If the task is periodically executed, the submitted task is executed first and the state of the task is reset to initialization state for the next execution
      else if (ScheduledFutureTask.super.runAndReset()) {
        // Calculate the time of the next execution
        setNextRunTime();
        // Resubmit the current task to the delay queue for the next period of executionreExecutePeriodic(outerTask); }}// Calculate the next time to execute the task
    // time indicates the time when the next task is executed. Period is used to calculate the period of time
    private void setNextRunTime(a) {
      long p = period;
      if (p > 0)
        // scheduleAtFixedRate
        // After the first execution of the task, the next execution time is exactly periodic, regardless of exactly when the execution is completed (i.e., now), and each subsequent execution is the same
        time += p;
      else
        // scheduleWithFixedDelay
        // After the first execution, the time to execute the next task is calculated based on the current time, that is, the time to execute the last task, and the same is true for each subsequent execution
        time = triggerTime(-p);
    }
    
    // Used to sort the delay queue in the order of the next trigger
    public int compareTo(Delayed other) {
      if (other == this) // compare zero if same object
        return 0;
      if (other instanceofScheduledFutureTask) { ScheduledFutureTask<? > x = (ScheduledFutureTask<? >)other;long diff = time - x.time;
        if (diff < 0)
          return -1;
        else if (diff > 0)
          return 1;
        // The trigger time is consistent, according to the order of submission
        else if (sequenceNumber < x.sequenceNumber)
          return -1;
        else
          return 1;
      }
      long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
      return (diff < 0)? -1 : (diff > 0)?1 : 0;
    }
    
    // Calculate the time between the current time and the next execution
    public long getDelay(TimeUnit unit) {
      return unit.convert(time - now(), NANOSECONDS);
    }
    Copy the code
    • scheduleWithFixedDelaywithscheduleAtFixedRateThe difference in implementation is that in this packaging process, the former passes in a cycle ofunit.toNanos(-delay)While the latter isunit.toNanos(perioid)
      • The idea is thatsetNextRunTimeMethod, see method annotation
    • In the packaging process, the periodic cyclic task and the periodic task in a row in the realization of theperiodThere’s another differenceouterTask
      • Periodic tasks hold this attribute so that the current task can be re-submitted to the delay queue for execution in the next cycle. For details, seereExecutePeriodicmethods
    • getDelayThe main application of the method is in delay queuetake pollIn these two methods of obtaining tasks, it plays a role of controlling the time of obtaining tasks
      • It should be said that, in general, delay queues can be sorted by delay time + delay queues can be usedgetDelayMethod to control the latency of fetching tasks – these two features are intuitively key to the effectiveness of deferred task thread pools
  • The second package is RunnableScheduleFuture

    protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture
             
               task)
              {
      return task;
    }
    protected <V> RunnableScheduledFuture<V> decorateTask( Callable
             
               callable, RunnableScheduledFuture
              
                task)
              
              {
      return task;
    }
    Copy the code
    • It actually goes straight backRunnableScheduledFuture, but do not understand why to use such a method type promotion
  • The entry method to the scheduled task thread pool is delayedExecute

    private void delayedExecute(RunnableScheduledFuture
              task) {
      // 1. Check whether the thread pool is shutdown
      if (isShutdown())
        reject(task);
      else {
        // 2. The first step is to add tasks to the DelayedWorkQueue
        super.getQueue().add(task);
        // 3. Re-check is performed after adding tasks to the queue, whether it is a regular thread pool or a thread pool that performs scheduled tasks
        if(isShutdown() && ! canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false);
        else
          // 4. If recheck passes, execute this method
          // Make sure there are threads running in the thread poolensurePrestart(); }}void ensurePrestart(a) {
      int wc = workerCountOf(ctl.get());
      // For the thread pool created by Executors, the number of core threads is 0, so non-core threads are guaranteed to execute
      if (wc < corePoolSize)
        addWorker(null.true);
      else if (wc == 0)
        addWorker(null.false);
    }
    Copy the code
    • If the thread pool state is notSHUTDOWNInstead of having the thread execute the task directly, add the task directly to the queue
  • Starting from addWorker, the following is the process of thread management and task acquisition of standard thread pool. That is to say, the main difference between scheduled task thread pool and general thread pool lies in the task scheduling part, and the channel connecting task management and thread management — delay queue also needs to be roughly understood

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
            implements BlockingQueue<Runnable> {
     
     // The method of submitting tasks during task scheduling is the add method
     public boolean add(Runnable e) {
       return offer(e);
     }
      
      public boolean offer(Runnable x) {
        if (x == null)
          throw newNullPointerException(); RunnableScheduledFuture<? > e = (RunnableScheduledFuture<? >)x;final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          int i = size;
          if (i >= queue.length)
            grow();
          size = i + 1;
          if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
          } else {
            // According to the sorting rules, select the appropriate position to insert into the queue
            siftUp(i, e);
          }
          if (queue[0] == e) {
            leader = null; available.signal(); }}finally {
          lock.unlock();
        }
        return true;
      }
      // According to the sorting rules, select the appropriate position to insert into the queue
      private void siftUp(intk, RunnableScheduledFuture<? > key) {
        while (k > 0) {
          int parent = (k - 1) > > >1; RunnableScheduledFuture<? > e = queue[parent];// Sort by the time attribute of RunnableScheduledFuture
          if (key.compareTo(e) >= 0)
            break;
          queue[k] = e;
          setIndex(e, k);
          k = parent;
        }
        queue[k] = key;
        setIndex(key, k);
      }
      
      // In getTask, the core thread fetches the task (no timeout)
      // If not currently available, block and wait
      publicRunnableScheduledFuture<? > take()throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          for(;;) { RunnableScheduledFuture<? > first = queue[0];
            if (first == null)
              available.await();
            else {
              // Call the getDelay method to get the amount of time to wait
              long delay = first.getDelay(NANOSECONDS);
              if (delay <= 0)
                return finishPoll(first);
              first = null; // don't retain ref while waiting
              if(leader ! =null)
                available.await();
              else {
                Thread thisThread = Thread.currentThread();
                leader = thisThread;
                try {
                  available.awaitNanos(delay);
                } finally {
                  if (leader == thisThread)
                    leader = null;
                }
              }
            }
          }
        } finally {
          if (leader == null && queue[0] != null) available.signal(); lock.unlock(); }}// In getTask, non-core thread retrieves task or core thread retrieves task (timeout allowed)
      publicRunnableScheduledFuture<? > poll(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          for(;;) { RunnableScheduledFuture<? > first = queue[0];
            if (first == null) {
              if (nanos <= 0)
                return null;
              else
                nanos = available.awaitNanos(nanos);
            } else {
              long delay = first.getDelay(NANOSECONDS);
              if (delay <= 0)
                return finishPoll(first);
              if (nanos <= 0)
                return null;
              first = null; // don't retain ref while waiting
              if(nanos < delay || leader ! =null)
                nanos = available.awaitNanos(nanos);
              else {
                Thread thisThread = Thread.currentThread();
                leader = thisThread;
                try {
                  long timeLeft = available.awaitNanos(delay);
                  nanos -= delay - timeLeft;
                } finally {
                  if (leader == thisThread)
                    leader = null;
                }
              }
            }
          }
        } finally {
          if (leader == null && queue[0] != null) available.signal(); lock.unlock(); }}}Copy the code
    • DelayedWorkQueueThe internal storage ofRunnableScheduledFutureArray of type
    • The same lock is used for submitting and acquiring tasks

Java thread pool source code analysis and summary (ii)

reference

  1. Implementation principle of Java Thread Pool and its Practice in Meituan business — Meituan technical team
  2. Java thread pool learning summary
  3. Java thread pool