An overview of the

Executor is a flexible and powerful asynchronous execution framework that supports many different types of task execution strategies, provides a standard way to decouple the submission process from the execution process, based on producer and consumer models, and provides lifecycle support and statistical collection. Application management mechanisms and performance monitoring mechanisms.

Members are divided into four parts: task, task execution, task execution result and task execution tool class

  • Task: Implement the Callable or Runnable interface
  • Task execution parts: ThreadPoolExecutor and ScheduledThreadPoolExecutor
  • Task execution results: Future interface and FutureTask implementation class
  • Factory: Executors

Executor

Java threads are both units of work and units of execution

 new Thread(new Runnable() {
     @Override
     public void run(a) {
         log.info("hello");
     }
 }).start();
Copy the code

Starting with JDK5, separate the unit of work, which includes Runnable and Callable, from the execution mechanism provided by the Executor framework.

The Executor interface decouples tasks from their execution and has only one method

public interface Executor {
    /** * Executes the given Runnable task. ** This will vary depending on the Executor implementation@param command the runnable task
     * @throws RejectedExecutionException if this task cannot be accepted for execution
     * @throws NullPointerException       if command is null
     */
    void execute(Runnable command);
} 
Copy the code

Instead of creating a Thread as shown (new Thread(new RunnableTask()).start())), we can perform the task as follows:

Executor executor = anExecutor();
executor.execute(new RunnableTask1());
executor.execute(newRunnableTask2()); .Copy the code

Executor is just an interface, so the exact way you perform a task will vary depending on how you implement it. The Executor interface does not strictly require execution to be asynchronous, which means it can be synchronous or asynchronous:

  • synchronous
 class DirectExecutor implements Executor {
   public void execute(Runnable r) { r.run(); }}Copy the code
DirectExecutor is a synchronous task executor. For incoming tasks, execute does not return until the execution is complete.Copy the code
  • asynchronous
class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     newThread(r).start(); }}Copy the code
ThreadPerTaskExecutor is an asynchronous task executor that creates a new thread for each task to executeCopy the code

Many Executor implementations impose some kind of restriction on how and when tasks can be scheduled, and we can see the beginning of thread pools in this example.

 class SerialExecutor implements Executor {
   final Queue<Runnable> tasks = new ArrayDeque<>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
     this.executor = executor;
   }

   public synchronized void execute(Runnable r) {
     tasks.add(() -> {
       try {
         r.run();
       } finally{ scheduleNext(); }});if (active == null) { scheduleNext(); }}protected synchronized void scheduleNext(a) {
     if((active = tasks.poll()) ! =null) { executor.execute(active); }}}Copy the code

Summary: The purpose of an Executor is to decouple the task itself from its execution

ExecutorService

ExecutorService is an Executor legacy that provides enhanced control over tasks, as well as lifecycle management. There are four main types:

  1. Close the actuator and forbid the task submission.
  2. Monitor the status of the actuator;
  3. Support for asynchronous tasks;
  4. Provides support for batch tasks.
public interface ExecutorService extends Executor {

    /** * Closes the actuator, which has the following characteristics: * 1. Tasks submitted to the actuator will continue to be executed, but new tasks will not be submitted. 2. If the executor is already closed, there are no side effects from calling it again
    void shutdown(a);

    * 1. Attempts to stop all tasks in progress are not guaranteed to succeed, but efforts will be made to stop them (for example, tasks interrupted by Thread.interrupt, but tasks that do not respond to interrupts may not be terminated); * 2. Suspend processing of tasks that have been submitted but not executed; * *@returnReturns a list of committed but not executed tasks */
    List<Runnable> shutdownNow(a);

    /** * Returns true if the actuator is already closed. */
    boolean isShutdown(a);

    * 

* Returns true only if the actuator is closed and all tasks have been executed. * Note: this method always returns false. */ unless shutdown or shutdownNow is called first

boolean isTerminated(a); /** * blocks the calling thread, waiting for the executor to reach the terminated state. **@return {@codeTrue} Returns true if the actuator finally reaches the terminating state; Otherwise return false *@throws InterruptedException if interrupted while waiting */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * Submit a task with a return value for execution. ** Note: The Future's get method will return the task's return value on successful completion@paramTask Task to be submitted *@param<T> The return value type of the task *@returnReturns the Future object * for the task@throwsRejectedExecutionException if unable to arrange task execution *@throws NullPointerException if the task is null */ <T> Future<T> submit(Callable<T> task); /** * Submit a Runnable task for execution. * note: The Future's get method will return the given result (specified when entering) on successful completion. **@paramTask Task to be submitted *@paramResult Return result *@param<T> The result type returned *@returnReturns the Future object * for the task@throwsRejectedExecutionException if unable to arrange task execution *@throws NullPointerException if the task is null */ <T> Future<T> submit(Runnable task, T result); * Note: The Future's get method will return null. ** on successful completion@paramTask Task to be submitted *@returnReturns the Future object * for the task@throwsRejectedExecutionException if unable to arrange task execution *@throws NullPointerException if the task is null */Future<? > submit(Runnable task);/** * Performs all tasks in the given set, and when all tasks are completed, returns a Future list that holds the status and result of the task. Returns the future.isdone () of all elements in the list as true. * *@paramTasks Task collection *@param<T> The return result type of the task *@returnA list of Future objects for tasks, in the same order as the iterators in the collection generate them, *@throwsInterruptedException cancels all unfinished tasks if an interruption occurs while waiting. *@throwsNullPointerException Either task is null *@throwsRejectedExecutionException if either cannot arrange task execution * / <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** * Executes all tasks in a given set, and returns a list of Future's that hold the status and result of the task when all tasks have completed or when the timeout expires (whichever happened first). */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * Executes tasks in a given collection, if only one of the tasks successfully completes first (without throwing an exception), and cancels unfinished tasks once normal or exception returns <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** * Executes the tasks in the given collection and returns the result if a task completed successfully (without throwing an exception) before the given timeout expires <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } Copy the code

ScheduledExecutorService

ScheduledExecutorService provides a series of schedule methods on an ExecutorService basis that can execute a committed task after a given delay, or once per specified period

For example,

import static java.util.concurrent.TimeUnit.*;

/** * A "buzzing" task is submitted with the scheduleAtFixedRate method. This task is executed every 10 seconds. After 1 hour, the buzzing task is cancelled

 class BeeperControl {
   private final ScheduledExecutorService scheduler =
     Executors.newScheduledThreadPool(1);

   public void beepForAnHour(a) {
     Runnable beeper = () -> System.out.println("beep"); ScheduledFuture<? > beeperHandle = scheduler.scheduleAtFixedRate(beeper,10.10, SECONDS);
     Runnable canceller = () -> beeperHandle.cancel(false);
     scheduler.schedule(canceller, 1, HOURS); }}Copy the code

ScheduledExecutorService Complete interface declarations are as follows:

public interface ScheduledExecutorService extends ExecutorService {
 
    /** * Submit a task to be executed and execute it after the given delay. **@paramCommand Indicates the task * to be executed@paramDelay Delay time *@paramUnit Unit of delay time */
    publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit);
 
    /** * Submit a task to be executed (with a return value) and execute the task after the given delay. **@paramCommand Indicates the task * to be executed@paramDelay Delay time *@paramUnit Unit of delay time *@param<V> Return value type */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
 
    /** * Submit a task to be executed. * The task starts after initialDelay, then initialDelay+period, then initialDelay+ 2 * period, and so on@paramCommand Indicates the task * to be executed@paramInitialDelay Initial execution delay *@paramPeriod Period between consecutive executions *@paramUnit Unit of delay time */
    publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit);
 
    /** * Submit a task to be executed. * The task started after initialDelay, and then there was a given delay between the termination of each execution and the start of the next. * If any execution of the task encountered an exception, subsequent execution would be canceled. Otherwise, the task can only be terminated by executing the program's cancel or terminate methods. * *@paramCommand Indicates the task * to be executed@paramInitialDelay Initial execution delay *@paramDelay Delay between the end of one execution and the start of the next *@paramUnit Unit of delay time */
    publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit);
}
Copy the code

summary

  1. Executor: Submits ordinary executable tasks
  2. ExecutorService: Provides support for thread pool lifecycle management and asynchronous tasks
  3. ScheduledExecutorService: Supports the periodic execution of tasks

The Executor ExecutorService and ScheduledExecutorService classes are all interface definitions.

The interface and class relationships mentioned above are shown below:

  1. Executor

    The executor interface, also the top-level abstract core interface, separates the task from its execution.

  2. ExecutorService

    Executor provides Executor lifecycle management and asynchronous task execution.

  3. ScheduledExecutorService

    Deferred/periodic execution of tasks is provided on an ExecutorService basis.

  4. Executors

    A static factory that produces concrete actuators

  5. ThreadFactory

    Thread factory, used to create a single thread, reduce the tedious work of manually creating threads, while allowing reuse of factory features.

  6. AbstractExecutorService

    The abstract implementation of ExecutorService provides the basis for implementing various executor classes.

  7. ThreadPoolExecutor

    Thread pool Executors, the most common Executor, manage threads in a thread pool manner.

  8. ScheduledThreadPoolExecutor

    Support for periodic task scheduling has been added to ThreadPoolExecutor.

  9. ForkJoinPool

    Fork/Join thread pool, introduced in JDK1.7, is the core class that implements the Fork/Join framework.

The Executor process is as follows:

Executors

Executors provides a simple factory and a series of tool methods. All of its methods are static, so users can choose to create an instance of the Executors according to their needs.

  • Creates and returns Settings with common configurationsExecutorServiceMethod of instance
  • Creates and returns Settings with common configurationsScheduledExecutorServiceMethod of instance
  • Create and returnExecutorServiceThese classes can hide the special implementation of subclasses and expose only the methods of the parent class
  • Creates and returns a thread that sets the newly created thread to a known stateThreadFactoryMethod of instance
  • Methods to create and return Callable instances from other closet-like forms that can be used in methods that require Callable.

We can also roughly see it from the method signature in the figure below:

Why is there a wrapper class?

Because returning a class like ThreadPoolExecutor directly includes methods to set the thread pool, such as setCorePoolSize, sometimes we don’t want the user to cast these methods (for example: NewSingleThreadExecutor), need to wrap it so that it returns classes that expose only the ExecutorService’s own methods

  • DelegatedExecutorServiceExecutorService is a wrapper around the ExecutorService, exposing only the ExecutorService interface methods to the user and excluding the specific methods that implement the specific class.
  • DelegatedScheduledExecutorServiceScheduledExecutorService is a wrapper around ScheduledExecutorService. Only the ScheduledExecutorService interface method is exposed to the user. And FinalizableDelegatedExecutorService is based on the packing of the ExecutorService, strengthen the function of automatic thread pool the recycling, the gc will finalize methods in the virtual machine cleaning object is called, The useless thread pool that the user forgot to close is closed and reclaimed.

ThreadPoolExecutor

The most common scenario for executors in everyday use is thread pools.

What is a thread pool?

Thread Pool is a tool to manage threads based on pooling idea

What does a thread pool do?

The main job of the thread pool is to control the number of running threads, put tasks into queues during processing, and then start these tasks after the creation of threads. If the number of threads exceeds the maximum number, the exceeding number of threads queue up and wait for other threads to finish executing, and then pull the task from the queue to execute. Its main characteristics are: thread reuse; Control the maximum number of concurrent requests; Manage threads.

What are the benefits of using thread pools?

Thread pooling enables uniform thread allocation, tuning, and monitoring:

  • Reduced resource consumption: Reuse of created threads through pooling techniques to reduce wastage from thread creation and destruction.
  • Improved response time: Tasks can be executed immediately when they arrive without waiting for threads to be created.
  • Improve manageability of threads: Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also cause resource scheduling imbalance due to unreasonable distribution of threads, which reduces system stability. Thread pools allow for uniform allocation, tuning, and monitoring.
  • More and more power: Thread pools are extensible, allowing developers to add more functionality to them. Such as delay timer thread pool ScheduledThreadPoolExecutor, allows a stay of execution or regular task execution.

What problem does a thread pool solve?

The core problem solved by thread pools is resource management. In a concurrent environment, the system cannot determine how many tasks need to be executed or how many resources need to be invested at any given time. This uncertainty raises several questions:

  1. The additional cost of applying/destroying resources and scheduling resources frequently can be significant.
  2. The lack of means to suppress unlimited resource applications may lead to the risk of system resource exhaustion.
  3. The system cannot properly manage internal resource distribution, which reduces system stability.

To solve the problem of resource allocation, thread Pooling adopts the idea of Pooling.

Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, Computing and equipment management. – wikipedia

Pooling, as the name suggests, is the idea of managing resources together in order to maximize returns and minimize risks.

Pooling can not only be applied in the field of computer, but also in finance, equipment, personnel management, work management and other fields.

In the field of computer performance for: unified management of IT resources, including servers, storage, and network resources and so on. By sharing resources, users benefit from low investment.

In addition to thread pools, several other typical usage strategies include:

  1. Memory Pooling: Apply for Memory in advance to speed up Memory Pooling and reduce Memory fragmentation.
  2. Connection Pooling: Apply for database connections in advance to improve the speed of applying for connections and reduce the system overhead.
  3. Object Pooling: Recycles objects to reduce expensive consumption of resources during initialization and release.

Executors provides a series of factory methods for creating thread pools. Let’s take a look at the core class, ThreadPoolExecutor, and then come back.

How does ThreadPoolExecutor work, maintaining threads and executing tasks at the same time?

Its operating mechanism is shown in the figure below:

Thread pooling actually builds a producer-consumer model internally, decoupling threads and tasks from each other and not directly related to each other, so as to buffer tasks well and reuse threads.

The thread pool runs in two main parts:

  • Task management
  • Thread management

The task management part acts as a producer, and when a task is submitted, the thread pool determines the subsequent flow of the task:

  1. Directly apply the thread to execute the task;
  2. Buffered to a queue for thread execution;
  3. Reject the task. The thread management part is the consumer, which is uniformly maintained in the thread pool. According to the task request, threads are allocated. When the thread completes the task, it will continue to acquire new tasks to execute.

How does a thread pool maintain its state?

The running state of the thread pool is not explicitly set by the user, but is maintained internally along with the running of the thread pool. A variable is used internally to maintain two values: runState and number of threads (workerCount). In the implementation, the thread pool combines the maintenance of two key parameters, runState and workerCount, as shown in the following code:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

Copy the code

CTL this AtomicInteger is a field that controls the running state of the thread pool and the number of valid threads in the pool. The runState of the thread pool and the number of valid threads in the thread pool (workerCount). The runState is stored in the higher 3 bits and the workerCount is stored in the lower 29 bits. The two variables do not interfere with each other. Using a variable to store two values can avoid inconsistencies when making relevant decisions. It is unnecessary to occupy lock resources to maintain the consistency of the two values. As you can also see from reading the thread pool source code, it is often necessary to determine both the running state of the thread pool and the number of threads. Thread pools also provide several methods for the user to obtain the current running state of the thread pool and the number of threads. All of these are bit operations, which are much faster than basic operations.

The internal encapsulation to get the life cycle state, get the number of threads in the thread pool is calculated as follows:

private static int runStateOf(int c)     { return c & ~CAPACITY; } // Calculate the current running status
private static int workerCountOf(int c)  { return c & CAPACITY; }  // Count the number of current threads
private static int ctlOf(int rs, int wc) { return rs | wc; }   // Generate CTLS from state and thread count
Copy the code

ThreadPoolExecutor can run in five states:

Its lifecycle transformation is shown below:

What is the task scheduling process?

Task scheduling is the main entry point to the thread pool. When a user submits a task, how the task will be executed is determined by this stage. Understanding this section is equivalent to understanding the core workings of thread pools. First of all, all tasks are scheduled by the Execute method. This part of the job is to check the running status of the current thread pool, the number of running threads, the execution policy, and determine the next process to be executed, whether to directly apply for thread execution, or buffer to the queue execution, or directly reject the task. Its execution process is as follows:

  1. First, check the RUNNING status of the thread pool. If it is not RUNNING, reject it directly. Ensure that the thread pool executes tasks in the RUNNING state.
  2. If workerCount < corePoolSize, a thread is created and started to execute the newly submitted task.
  3. If workerCount >= corePoolSize and the blocking queue in the thread pool is not full, the task is added to the blocking queue.
  4. If workerCount >= corePoolSize && workerCount < maximumPoolSize and the blocking queue in the thread pool is full, a thread is created and started to execute the newly submitted task.
  5. If workerCount >= maximumPoolSize and the blocking queue in the thread pool is full, the task is processed according to the reject policy. The default is to throw an exception directly.

Its execution process is shown in the figure below:

How are tasks buffered?

The task buffer module is the core part of the thread pool that can manage tasks. The essence of thread pool is the management of tasks and threads, and the key idea to achieve this is to decouple the tasks and threads from the direct correlation, so that the subsequent allocation work can be done. Thread pools are implemented in producer-consumer mode through a blocking queue. The blocking queue caches tasks from which the worker thread retrieves them.

A BlockingQueue is a queue that supports two additional operations. The two additional operations are: when the queue is empty, the thread that fetched the element waits for the queue to become non-empty. When the queue is full, the thread that stores the element waits for the queue to become available. Blocking queues are often used in producer and consumer scenarios, where the producer is the thread that adds elements to the queue and the consumer is the thread that takes elements from the queue. A blocking queue is a container in which producers hold elements, and consumers only take elements from the container.

The figure below shows thread 1 adding elements to the blocking queue and thread 2 removing elements from the blocking queue

Different queues can implement different task access policies:

The thread needs to fetch tasks from the task cache module to help the thread get tasks from the blocking queue, and realize the communication between the thread management module and the task management module. This part of the strategy is implemented by getTask method, and its execution process is shown in the figure below:

How was the mission rejected?

The task rejection module is the protected part of the thread pool. The thread pool has a maximum capacity. When the task cache queue of the thread pool is full and the number of threads in the thread pool reaches maximumPoolSize, the task must be rejected and the task rejection policy is adopted to protect the thread pool.

Users can implement this interface to customize rejection policies,

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
} 

Copy the code

You can also choose from the four existing rejection policies provided by the JDK, which have the following features:

How are threads managed?

A thread set workerSet and a blocking queue workQueue. When a user submits a task (that is, a thread) to the thread pool, the thread pool first places the task in the workQueue. The threads in the workerSet are constantly fetching threads from the workQueue and executing them. When there are no tasks in the workQueue, the worker blocks until there are more tasks in the queue and then fetches them to continue executing.

The task execution model of Worker is shown in the figure below:

Core parameter

With that in mind, it’s easy to understand the constructor and core parameters of ThreadPoolExecutor:

  • CorePoolSize: number of core threads.
  • MaximumPoolSize: indicates the maximum number of threads.
  • KeepAliveTime: indicates the thread lifetime.
  • Unit: keepAliveTime unit
  • WorkQueue: block queue of Runnable. If the thread pool is full, the queue is used to hold Runnable that can no longer be put into the thread pool.
  • ThreadFactory: a threadFactory
  • Handler: rejects the policy
 /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

A few things to note:

  • In general,corePoolSizeandmaxiunmPoolSizeThis is only initialized at build time, but can be changed dynamically with setCorePoolSize(int) and setMaximumPoolSize(int).
  • By default, even the core thread is initially created and started only when new tasks are needed. But we can use itprestartCoreThread()Create an idle task thread to wait for the arrival of the taskprestartAllCoreThreads()(Create the core thread pool number of idle task threads waiting for the arrival of the task) method adjusts dynamically.
  • If the number of threads running in a pool is greater thancorePoolSizeIf the extra threads remain idle for longer thankeepAliveTime, the threads will be closed. This reduces resource consumption when the thread pool is inactive. By default, the keep-alive policy is only applicable to greater thancorePoolSize The number of threads, that is, non-core threads, but the allowCoreThreadTimeOut(Boolean) method can also be used to apply this timeout policy to core threads, as long as the keepAliveTime value is non-zero.
  • ThreadPoolExecutor provides hook methods before and after each task execution, overriddenBeforeExecute (Thread, Runnable)andAfterExecute (Runnable, Throwable)Method to manipulate the execution environment; For example, reinitialize ThreadLocals, collect statistics or log, etc.terminated()Methods can also be overridden, allowing you to do special processing when the thread pool terminates completely.

How to properly configure the number of threads

Some thread pool parameter configuration schemes in the industry, generally speaking, are not reliable, or according to the actual situation of their own business to decide.

How to shut down

When a thread pool is no longer referenced by other programs and there are no threads in the pool, it is automatically shutdown.

The specific code is here:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // This is actually the main operation of thread recycling, removing the thread pool reference so that it can be recycled by the JVM normally
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();
    Since there are many possible causes for a thread collection, the thread pool also needs to determine what caused the collection,
    // Whether to change the current state of the thread pool, whether to reassign threads based on the new state, hence the following part of the logic
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0&&! workQueue.isEmpty()) min =1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null.false); }}Copy the code

It can also be manually, as follows:

  • shutdown

    Setting the thread pool state to SHUTDOWN does not stop it immediately:

    1. Stop receiving external Submit tasks
    2. Internal running tasks and tasks waiting in the queue will run out
    3. Wait until the second step is complete, then really stop
  • shutdownNow

    Set the thread pool state to STOP. An attempt to stop immediately, not necessarily:

    1. Like shutdown(), stop receiving externally submitted tasks first
    2. Ignore tasks waiting in the queue
    3. Try to interrupt a running task
    4. Returns a list of unexecuted tasks

    It attempts to terminate a Thread by calling thread.interrupt (), but this method is of limited use. It cannot interrupt the current Thread without sleep, wait, Condition, timed lock, etc. Therefore, shutdownNow() does not necessarily mean that the thread pool can exit immediately; it may also have to wait for all ongoing tasks to complete before exiting. But most of the time you can quit immediately

  • awaitTermination(long timeOut, TimeUnit unit)

    The current thread blocks until

    1. Wait for all submitted tasks (both running and queuing) to complete
    2. Or wait for the timeout to run out
    3. Or the thread is interrupted, throws InterruptedException and returns true (all tasks completed after shutdown request) or false (timed out)

Note:

  • Shuntdown () and awaitTermination() have the same effect. After the method is executed, it should wait until all the committed tasks have been executed.
  • After shutdown(), no new task can be submitted. But after awaitTermination(), the submission can continue
  • AwaitTermination () is blocked and returns whether the thread pool has been stopped (true/false); Shutdown () are not blocked

How to close gracefully?

The first way

First take a look at the source comments:

A pool that is no longer referenced in a program AND has no remaining threads may be reclaimed (garbage collected) without being explicitly shutdown. You can configure a pool to allow all unused threads to eventually die by setting appropriate keep-alive times, using a lower bound of zero core threads and/or setting allowCoreThreadTimeOut(boolean).

If a thread pool reference is no longer held in the program and there are no threads in the pool, the thread pool is automatically closed.

Two conditions for automatic thread pool shutdown:

  • The thread pool reference is not reachable.
  • There are no threads in the thread pool.

To clarify condition 2, no threads in the thread pool means that all threads in the thread pool have run out of time and die automatically. However, if the core thread of our ThreadPool does not have a timeout policy, the ThreadPool will not shut down automatically.

So you need to set:

// The thread pool will release all idle threads after the timeout period, so that the process can exit
pool.allowCoreThreadTimeOut(true);
Copy the code

The second way

Close gracefully using runtime.getruntime ().addShutdownhook and Guava’s methods

static {
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      @Override
      public void run(a) {
        System.out.println("==== Start closing the thread pool");
        CommonThreadPool.gracefulShutdown(pool, 10, TimeUnit.SECONDS);
        System.out.println("==== end close thread pool"); }})); }public static boolean gracefulShutdown(ExecutorService threadPool, int shutdownTimeout,
      TimeUnit timeUnit) {
    return threadPool == null || MoreExecutors
        .shutdownAndAwaitTermination(threadPool, shutdownTimeout, timeUnit);
  }
Copy the code

myth

Do not set thread pool threads to daemons. Although daemons do not prevent the JVM from exiting, this can cause an exception if there are unfinished tasks.

Executors Create a thread pool

Different types of thread pools can be created by using the static method of **Executors, but it is not recommended.

  • NewFixedThreadPool (int nThreads) — Creates a thread pool with a fixed number of threads
    • At any point in time, at most a fixed number of active threads can exist. If a new thread is to be created, it can only be created in another queue (an unbounded queue called LinkedBlockingQueue with an upper limit of integer.max_value). Until one of the current threads terminates and is removed from the pool. Most for some very stable very fixed regular concurrent threads, more for the server.
  • NewSingleThreadExecutor creates a single threaded Executor
    • When multiple tasks are submitted to SingleThreadExecutor, they are queued.
  • NewCachedThreadPool — Create a cacheable thread pool and call execute to reuse previously constructed threads (if available). If no existing thread is available, a new thread is created and added to the pool. Terminating removes threads from the cache that have not been used for 60 seconds.
  • NewScheduledThreadPool (int corePoolSize) creates a thread pool that supports timed and periodic task execution and can be used in most cases in place of the Timer class

Other problems

Why not use Executors to create thread pools? What is recommended?

* If the thread pool object returns by Executors, it has the following disadvantages:

  1. FixedThreadPool and SingleThreadPool:

The allowed queue length is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM.

  1. CachedThreadPool and ScheduledThreadPool:

The number of threads allowed to be created is integer.max_value, which may create a large number of threads, resulting in OOM.

Do not use Executors to create a thread pool. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion.

reference

  • Nullwy. Me / 2017/03 / jar…
  • www.jianshu.com/p/f54b224e2…
  • Segmentfault.com/a/119000001…
  • www.cnblogs.com/liuyishi/p/…
  • Juejin. Cn/post / 692206…
  • Pdai. Tech/md/Java/THR…
  • Tech.meituan.com/2020/04/02/…