First, content summary

In this article, we will introduce the JDK’s ThreadPoolExecutor construction process and internal state management mechanism. Then, a great deal of space is devoted to the source code to explore the process of ThreadPoolExecutor thread allocation, task processing, rejection strategy, start and stop, among which emphasis is placed on the analysis of Worker built-in class, including not only its working principle, but also its design ideas. The article not only contains the source code process analysis, but also has the design ideas and secondary development practice.

Construct ThreadPoolExecutor

2.1 Thread pool parameter list

You can create a thread pool using one of the following constructors (there are other constructors out there that you can dig into the source code for, but you end up calling the following constructor to create a thread pool);

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {... }Copy the code

The function of the structural parameters is as follows:

  • CorePoolSize: number of core threads. When submitting a task, when the number of threads in the thread pool is smaller than corePoolSize, a new core thread is created to execute the task. When the number of threads equals corePoolSize, the task is added to the task queue.

  • MaximumPoolSize: indicates the maximum number of threads. When a task is submitted, the thread pool causes non-core threads to execute the submitted task when the task queue is full and the total number of threads in the thread pool is not greater than maximumPoolSize. When greater than maximumPoolSize, the reject policy is executed.

  • KeepAliveTime: the lifetime of non-core threads when they are idle.

  • Unit: keepAliveTime unit.

  • WorkQueue: task queue (blocking queue).

  • ThreadFactory: threadFactory. The factory class that the thread pool uses to create new threads.

  • Handler: A rejection policy used by the thread pool to discard or ignore a task when it encounters an unprocessed situation.

2.2 Overview of execution process

From the role of the construction parameter, we know that the thread pool consists of several important components: the core thread pool, the idle (non-core) thread pool, and the blocking queue. Here first gives the thread pool core execution flow chart, we first have an impression of it, after the analysis of the source code will be easier.

Here are some comments on the flowchart: CAP represents the capacity of the pool, and SIZE represents the number of threads running in the pool. For blocking queues, CAP represents the queue capacity and SIZE represents the number of tasks that have been enqueued. CpS < CPC indicates that the number of running core threads is less than the number of core threads set by the thread pool.

1) When the core thread pool is not “full”, a new core thread will be created to execute the submitted task. Here, “full” means that the number (size) in the core thread pool is less than the capacity (CAP), in which case the submission task will be executed by a newly created thread in the thread factory.

2) When the core thread pool is “full”, the submitted task will be pushed into the task queue, waiting for the release of the core thread. Once the core thread is released, the pull task continues execution from the task queue. Because the blocking queue is used, the core thread that has been freed will also block in the process of acquiring the task.

3) When the task queue is also full (which means really full, regardless of the unbounded queue), threads are created from the free thread pool to execute the submitted task. However, threads in the idle thread pool have a keepAliveTime **. When a thread completes its task, it can only live for a keepAliveTime period, after which the thread must be destroyed.

4) If the number of threads in the free thread pool increases until the total number of threads in ThreadPoolExecutor exceeds maximumPoolSize, the task will be rejected and submitted to RejectedExecutionHandler for subsequent processing.

The core thread pool and the free thread pool are just abstractions that will be examined later.

2.3 Common thread pools

Before we dive into the source code for ThreadPoolExecutor, let’s take a look at common thread pools (they’re not common, they just come with the JDK). The thread pools can be created by the Executors tool class (or thread pool factory).

2.3.1 FixedThreadPool

A fixed thread pool is created as follows: the number of core threads is fixed and equal to the maximum number of threads, using an unbounded blocking queue with a linked list as the underlying structure.

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
Copy the code

Features:

  • The number of core threads is equal to the maximum number of threads, so no idle threads are created. It does not matter whether keepAliveTime is set.

  • With an unbounded queue, tasks are added indefinitely until memory runs out (OOM).

  • Since the unbounded queue cannot be filled, the task cannot be rejected before execution (provided that the thread pool is always running).

Application Scenarios:

  • This method is recommended when the number of threads is fixed

  • This mode applies to servers with heavy loads

2.3.2 SingleThreadExecutor

A single-threaded thread pool is created as follows: the number of core threads and the maximum number of threads are both 1, using an unbounded blocking queue with a linked list as the underlying structure.

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
Copy the code

The characteristics of

  • This is similar to a FixedThreadPool, except that the number of threads is 1.

Application scenarios

  • This applies to single-thread scenarios.

  • This method is applicable to scenarios that require sequential processing of submitted tasks.

2.3.3 CachedThreadPool

The buffer thread pool is created as follows: the number of core threads is 0 and the maximum number of threads is integer.max_value (which can be interpreted as infinity). Use synchronous blocking queues.

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
Copy the code

Features:

  • If the number of core threads is 0, an idle thread will be created initially, and the idle thread can only wait for tasks for 60 seconds. If no task is submitted within 60 seconds, the idle thread will be destroyed.

  • The maximum number of threads is infinite, which causes a large number of threads to run at the same time, causing the CPU to be overloaded and causing the application to crash.

  • Synchronous blocking queues are used, that is, queues do not store tasks. Submit one and consume one. Since the maximum number of threads is infinite, any task submitted must be consumed (before the application crashes).

Application Scenarios:

  • Suitable for short – time, asynchronous small programs.

  • This mode is applicable to servers with light loads.

Thread pool status and number of active threads

ThreadPoolExecutor has two very important parameters: ** Thread pool state ** (RS) and the number of active threads (WC). The former is used to identify the state of the current thread pool and control what the pool should do based on the amount of state. The latter is used to identify the number of active threads, which controls whether threads should be created in the core or idle thread pool.

ThreadPoolExecutor uses an Integer variable (CTL) to set these two parameters. As we know, the Integer variable in Java is 32 bits under different operating systems, and ThreadPoolExecutor uses the first three bits (31 29) to represent the state of the thread pool and the last 29 bits (28 0) to represent the number of active threads.

What is the purpose of this setting?

As we know, maintaining two variables at the same time in a concurrent scenario is very expensive, and locks are often required to ensure that the changes between the two variables are atomic. When two parameters are maintained by a single variable, only one statement is required to ensure the atomicity of the two variables. This approach greatly reduces concurrency issues during use.

With that in mind, let’s look at the source code for several states of ThreadPoolExecutor, and how ThreadPoolExecutor operates on both state and number of active threads.

ThreadPoolExecutor’s source code for state initialization is as follows:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

ThreadPoolExecutor defines the CTL variable using the atomic Integer. CTL wraps the active thread count and thread pool runtime state variables in an int. To achieve this, the number of ThreadPoolExecutor threads is limited to 2^29-1 (about 500 million), rather than 2^31-1 (2 billion), because the first three bits are used to identify the state of ThreadPoolExecutor. If there are not enough threads in ThreadPoolExecutor in the future, you can set the CTL to atomic long and adjust the mask accordingly.

COUNT_BITS is conceptually used to represent the boundary between state bits and thread bits, but is actually used for shift operations such as state variables. Integer.sixze-3=32-3=29

CAPACITY Indicates the maximum CAPACITY of ThreadPoolExecutor. As you can see from the figure below, after the shift, the last 29 bits of an int reach their maximum value: all ones. These 29 bits represent the number of active threads, and all 1s indicate the maximum number of threads ThreadPoolExecutor can hold. The first three bits are 0, indicating that the variable is only related to the number of active threads, not the state. This is also to facilitate subsequent bit operations.

RUNNING, SHUTDOWN, STOP, TIDYING, and TERMINATED states of ThreadPoolExecutor. The operations corresponding to the five states are as follows:

RUNNING: New tasks can be received and tasks in the blocking queue can be processed continuously.

SHUTDOWN: no new tasks can be received. Tasks in the blocking queue can be processed.

STOP: cannot receive new tasks and blocks all tasks in the queue.

TIDYING: All quests are terminated and all threads are cleared.

TERMINATED: The thread pool is closed.

The calculation process of these 5 states is shown in the figure below. After shift calculation, the last 29 bits of the value are all 0, and the first 3 bits represent different states respectively.

After the above variable definition, ThreadPoolExecutor separates the state from the number of threads and sets the int value at different sequential bits, which makes the following operations much easier.

Let’s take a look at how ThreadPoolExecutor gets state and number of threads.

3.1 runStateOf (c) method

private static int runStateOf(int c) {
    return c & ~CAPACITY;
}
Copy the code

The runStateOf() method is used to get the thread pool state. Parameter C is generally a CTL variable, which contains the status and number of threads. The process of runStateOf() shift calculation is shown in the figure below.

CAPACITY is set to position 1, position 0, position 29. Take the inverted value and perform the ‘and’ operation with CTL. Since any value ‘and’ 1 is the original value, ‘and’ 0 is 0. Therefore, after the ‘and’ operation, the high 3 bits of CTL retain their original values, while the low 29 position is 0. This separates the status value from the CTL.

3.2 workerCountOf (c) method

private static int workerCountOf(int c) {
    return c & CAPACITY;
}
Copy the code

The workerCountOf(c) method is similar to the above analysis idea, that is, the last 29 bits are separated from the CTL to obtain the number of active threads. As shown in the figure below, I will not repeat it here.

3.3 ctlOf(RS, WC) method

private static int ctlOf(int rs, int wc) {
    return rs | wc;
}
Copy the code

CtlOf (RS, WC) calculates the CTL value from the state value and thread value. Rs is short for runState and WC is short for workerCount. The last 29 bits of RS are 0, and the first three bits of WC are 0. The final values calculated by “or” retain both the first three bits of RS and the last 29 bits of WC, namely CTL values.

ThreadPoolExecutor has several other CTL methods that are similar to the above ones.

This summary concludes with a look at the path through which ThreadPoolExecutor transitions state, also known as the lifecycle.

Execute (

4.1 the execute method

Execute () :

public void execute(Runnable command) {
  // If the task to be executed is null, a null pointer exception is returned. If there are no tasks, there is no need to perform the following steps.
  if (command == null) throw new NullPointerException();
  CTL = (runState + workerCount)
  int c = ctl.get();
  // If workerCount < the number of core threads
  if (workerCountOf(c) < corePoolSize) {
    // Execute the addWorker method. The addWorker() method is examined in detail below, which can be simply interpreted as adding worker threads to handle tasks. True here means: add worker threads when the number of core threads is smaller, that is, add core threads.
    if (addWorker(command, true))
      // If the data is added successfully, the system returns
      return;
    // If the worker fails to be added, obtain the CTL value again to prevent the state from changing when the worker is added
    c = ctl.get();
  }
  // Run here to indicate that the number of core threads is full, so the second parameter in addWorker below is false. Determine if the thread pool is running, and if so, try adding tasks to the task queue
  if (isRunning(c) && workQueue.offer(command)) {
    // Get the CTL value again and double-check
    int recheck = ctl.get();
    // If the thread pool is not running, try to remove the task from the task queue
    if (! isRunning(recheck) && remove(command))
      // Execute the deny policy
      reject(command);
    // If the thread pool is running or the task fails to be removed
    else if (workerCountOf(recheck) == 0)
      // Execute the addWorker method to add non-core threads (idle threads with a lifetime)
      addWorker(null.false);
  }
  // If the thread pool is not running, or the task queue fails to add a task, try the addWorker() method again
  else if(! addWorker(command,false))
    // addWorker() fails to execute the reject policy
    reject(command);
}
Copy the code

Source code analysis directly look at the comments on the line, each line has, grey often grey often detailed.

Execute () encapsulates the thread creation logic of ThreadPoolExecutor. Execute () determines the creation timing of core and idle threads, as well as the execution of rejection policies. Here is a summary of the above source code through the following flow chart.

The logic to execute the submitted task by creating a thread is encapsulated in the addWorker() method. In the next section we will examine the specific logic for performing the commit task. There are a couple more methods in the execute() method that I’ll show you here.

3.1.1 workerCountOf ()

Getting the number of active threads from the CTL is described in section 2.

3.1.2 set ()

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
Copy the code

Determine whether ThreadPoolExecutor is running based on the CTL value. In the source directly judge whether CTL < SHUTDOWN is established, this is because the highest bit of CTL in the running state is 1, must be negative; All the other states, the highest digit is 0, so it’s definitely positive. Therefore, you can determine whether the CTL is running by judging its size.

3.1.3 reject ()

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}
Copy the code

Call the rejectedExecution() method of the RejectedExecutionHandler interface at initialization directly. This is also the typical use of the policy pattern, where the actual rejection operation is encapsulated in an implementation class that implements the RejectedExecutionHandler interface. I’m not going to expand it here.

4.2 addWorker method

AddWorker () :

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  // Execute logic in an infinite loop. Ensure that a multithreaded environment exits the loop under expected conditions.
  for (;;) {
    // Get the CTL value and extract the thread pool running state from it
    int c = ctl.get();
    int rs = runStateOf(c);
    // If rs > SHUTDOWN, no new tasks are allowed to be received, and no tasks in the work queue are allowed to be executed.
    // If rs == SHUTDOWN, the task is null, and the work queue is not empty, go to the following logic 'execute the work queue'.
    // we set firstTask == null because: when the thread pool is SHUTDOWN, no new tasks can be added to the pool. Only the remaining tasks in the work queue can be executed.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null&&! workQueue.isEmpty()))return false;
    for (;;) {
      // Get the number of active threads
      int wc = workerCountOf(c);
      If the number of active threads >= capacity, no new tasks are allowed to be added
      // If core is true, the core thread is created. If the number of active threads is greater than the number of core threads, the thread is not allowed to be created
      // If core is false, idle threads are created. If the number of active threads > the maximum number of threads, threads are not allowed to be created
      if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // Try to increase the number of core threads, increase the success of directly interrupt the outermost loop, start creating worker threads
      // If the increment fails, the intra-loop logic continues
      if (compareAndIncrementWorkerCount(c))
        break retry;
      // Get the CTL value to check whether the running status is changed
      c = ctl.get();
      // If the running state has changed, the outer loop is restarted
      // If the running status does not change, continue the inner loop
      if(runStateOf(c) ! = rs)continueretry; }}// Log the status of worker threads
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    // new a new worker thread, each worker holds the thread that actually performs the task.
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if(t ! =null) {
      // Add a lock to ensure atomicity of workerAdded status changes
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // Get the thread pool state
        int rs = runStateOf(ctl.get());
        // If it is running, create the worker thread
        // If the state is SHUTDOWN and firstTask == NULL, a thread will be created to execute the task in the task queue.
        if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
          Throw an exception if the thread is running before it is started
          if (t.isAlive())
            throw new IllegalThreadStateException();
          // Cache worker threads locally
          workers.add(w);
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;
          // The worker thread was added successfully and changed to true
          workerAdded = true; }}finally {
        mainLock.unlock();
      }
      // Start the worker thread after the status change succeeds
      if (workerAdded) {
        // Start the worker thread
        t.start();
        // Change the startup state
        workerStarted = true; }}}finally {
    // If the worker thread state does not change, the failure logic is processed
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}
Copy the code

AddWorker () determines the status of ThreadPoolExecutor through internal and external loops and successfully updates the number of active threads through CAS. This is to ensure that multiple threads in the thread pool can exit the loop as expected in a concurrent environment.

The method then creates a new Worker and starts the Worker thread built into the Worker. Here, two states of workerAdded and workerStarted are used to judge whether the Worker is successfully cached and started.

The process of modifying workerAdded will use mainlock of ThreadPoolExecutor to ensure atomicity and prevent the process of adding data to workers and obtaining the number of workers from happening unexpectedly in a multi-threaded concurrent environment.

AddWorker () starts the worker thread by first creating a worker object, obtaining the worker thread from it, and then starting it. Therefore, the real thread startup process is still in the worker object.

Here is a summary of addWorker through a flow chart:

AddWorker also has several methods to analyze here:

2 runStateOf ()

Get the ThreadPoolExecutor state from the CTL, as detailed in Chapter 2.

4.2.2 workerCountOf ()

Get the number of active ThreadPoolExecutor threads from the CTL, as detailed in Chapter 2.

Holdings compareAndIncrementWorkerCount ()

int c = ctl.get();
if(compareAndIncrementWorkerCount(c)) {... }private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
Copy the code

** Set the number of active threads in CTL to +1 by CAS. The thread count can be changed by setting the CTL value to +1. Since the value of the CTL thread count is stored in the last 29 bits, +1 only affects the last 29 bits without overflow, only increasing the thread count by +1. The thread pool state is not affected.

4.2.4 addWorkerFailed ()

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if(w ! =null)
            / / remove the worker
            workers.remove(w);
        // Number of active threads -1
        decrementWorkerCount();
        // Try to stop the thread pool
        tryTerminate();
    } finally{ mainLock.unlock(); }}private void decrementWorkerCount(a) {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
Copy the code

This method is executed after the worker thread fails to start. When does this happen? After the number of active threads is successfully increased and the new Worker is successfully added, the state of the thread pool changes to > SHUTDOWN, and it cannot accept new tasks or execute the remaining tasks in the task queue. In this case, the thread pool should be stopped directly.

The method is in this case:

  • Remove the newly created Worker from the workers cache pool;

  • The number of active threads is reduced by 1 via an infinite loop +CAS;

  • Try to stop the thread pool by executing the tryTerminate() method.

The thread pool will enter TERMINATED state after the tryTerminate() method is executed.

4.2.5 tryTerminate ()

final void tryTerminate(a) {
    for (;;) {
        int c = ctl.get();
        // If the current thread pool state is TERMINATED and cannot enter the TERMINATED state, false is returned
        if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // If the number of active threads is not 0, all worker threads will be interrupted. This will be explained in detail below, which will relate to the reason why worker does not use CLH inside although it inherits AQS.
        if(workerCountOf(c) ! =0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        // add global lock
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // First change CTL to (rs=TIDYING, WC =0) via CAS, because the above decision guarantees that the current thread pool can reach this state.
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // The user can implement custom methods by inheriting ThreadPoolExecutor.
                    terminated();
                } finally {
                    // Change CTL to (rs=TERMINATED, wc=0) and the thread pool will be closed.
                    ctl.set(ctlOf(TERMINATED, 0));
                    // The other threads TERMINATED will exit after they TERMINATED the pool.
                    termination.signalAll();
                }
                return; }}finally {
            // Release the global lockmainLock.unlock(); }}}Copy the code

5. Analysis of Worker built-in classes

5.1 Worker Object Analysis

Worker object source code analysis:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  // The worker thread
  final Thread thread;
  // Submitted tasks to be performed
  Runnable firstTask;
  // The number of tasks completed
  volatile long completedTasks;
  Worker(Runnable firstTask) {
    // Initialization state
    setState(-1);
    this.firstTask = firstTask;
    // Create threads from thread factories
    this.thread = getThreadFactory().newThread(this);
  }
  The execution logic is encapsulated in runWorker(), which is executed when t.start() is added to addWorker()
  public void run(a) {
    runWorker(this);
  }
  // Implement some methods in AQS
  protected boolean isHeldExclusively(a) {... }protected boolean tryAcquire(int unused) {... }protected boolean tryRelease(int unused) {... }public void lock(a)        {... }public boolean tryLock(a)  {... }public void unlock(a)      {... }public boolean isLocked(a) {... }// Interrupts the holding thread
  void interruptIfStarted(a) {
    Thread t;
    if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
      try { t.interrupt(); }
      catch (SecurityException ignore) {}
    }
  }
}
Copy the code

It can be seen from the above source code: Worker implements the Runnable interface, indicating that Worker is a task; Worker also inherits AQS, indicating that Worker has the nature of lock at the same time. However, Worker does not use CLH functions like ReentrantLock and other locking tools, because there is no scenario where multiple threads access the same Worker in the thread pool, instead, it only uses the function of state maintenance in AQS. This will be explained in detail below.

Each Worker object will hold a Worker thread. When the Worker is initialized, the Worker thread will be created through the thread factory and passed into the Worker thread as a task. Therefore, the running of tasks in the thread pool is not directly executing the run() method of the submission task, but executing the run() method of the Worker, and then executing the run() method of the submission task in this method.

The run() method in Worker delegates specific logic to runWorker() in ThreadPoolExecutor.

Here’s a picture to summarize:

  • The Worker itself is a task and holds tasks and Worker threads submitted by the user.

  • The worker thread holds the task this itself, so calling the worker thread’s start() method is actually executing this’s own run() method.

  • This’s own run() delegates concrete logic to the global runWorker() method.

  • The run() method in the runWorker() method that performs user-submitted tasks executes user-specific logic.

5.2 runWorker method

The runWorker() source code is as follows:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  // Copy the submitted task and set the firstTask in the Worker to null for the next reassignment.
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock();
  boolean completedAbruptly = true;
  try {
    GetTask () keeps fetching tasks from the task queue after executing the holding task
    while(task ! =null|| (task = getTask()) ! =null) {
      w.lock();
      try {
        // The hook function of ThreadPoolExecutor. Users can implement ThreadPoolExecutor and override the beforeExecute() method to complete customized operation logic before the task is executed.
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          // Executes the run() method of the submitted task
          task.run();
        } catch (RuntimeException x) {
          ...
        } finally {
          // The hook function of ThreadPoolExecutor, same as beforeExecute, but executed after the task is finished.afterExecute(task, thrown); }}finally {
        // Facilitate task collection
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    // It means that there are no tasks in the task queue or the thread pool is closed, and the worker needs to be flushed from the cacheprocessWorkerExit(w, completedAbruptly); }}Copy the code

RunWorker () is the method that actually executes the submitted task, but instead of executing it through the Thread.start() method, it executes the run() method of the task directly.

RunWorker () continuously fetches tasks from the task queue and executes them.

RunWorker () provides two hook functions, and if the JDK’s ThreadPoolExecutor doesn’t meet the developer’s needs, Developers can inherit ThreadPoolExecutor and override the beforeExecute() and afterExecute() methods to customize the logic that needs to be executed before the task is executed. For example, set monitoring indicators or print logs.

5.2.1 getTask ()

private Runnable getTask(a) {
    boolean timedOut = false;
    // An infinite loop guarantees that the task will be acquired
    for(;;) {...try {
            // Get the task from the task queue
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

5.2.2 processWorkerExit ()

private void processWorkerExit(Worker w, boolean completedAbruptly) {...final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // Remove the worker from the cache
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // Try to stop the thread pooltryTerminate(); . }Copy the code

Vi. Execution process of shutdown(

Thread pools have two active closing methods;

Shutdown () : closes all idle Worker threads in the thread pool and changes the state of the thread pool to shutdown.

ShutdownNow () : Closes all Worker threads in the thread pool, changes the thread pool state to STOP, and returns a list of all pending tasks.

Why is there a distinction between idle and non-idle Worker threads?

From the above runWorker() method, we know that the Worker thread ideally keeps fetching and executing tasks from the task queue in the while loop, and the Worker thread is non-idle at this point. Worker threads that are not executing tasks are idle. Since the SHUTDOWN state of the thread pool does not allow new tasks to be received and only the remaining tasks in the task queue can be executed, all idle Worker threads need to be interrupted, and non-idle threads continue to execute tasks in the task queue until the queue is empty. The STOP state of the thread pool does not allow new tasks to be accepted or remaining tasks to be executed, so all Worker threads, including running ones, need to be shut down.

6.1 shutdown ()

Shutdown () :

public void shutdown(a) {
  // add a global lock
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // Check whether there is permission to close the thread pool. In this case, the SecurityManager checks the current thread and the "modifyThread" permission of each Worker thread
    checkShutdownAccess();
    // Change the thread pool state
    advanceRunState(SHUTDOWN);
    // Close all idle threads
    interruptIdleWorkers();
    / / hook function, users can inherit ThreadPoolExecutor and implement custom hooks, ScheduledThreadPoolExecutor they realized the hook function
    onShutdown();
  } finally {
    mainLock.unlock();
  }
  // Try to close the thread pool
  tryTerminate();
}
Copy the code

Shutdown () encapsulates the closing step of ThreadPoolExecutor in several methods and ensures that only one thread can actively close ThreadPoolExecutor through a global lock. ThreadPoolExecutor also provides a hook function onShutdown() that lets developers customize the shutdown process. Such as ScheduledThreadPoolExecutor will be closed for cleaning task queue.

The following is an analysis of the methods.

checkShutdownAccess()

private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
 
private void checkShutdownAccess(a) {
  SecurityManager security = System.getSecurityManager();
  if(security ! =null) {
    // Validates the permission of the current thread, where shutdownPerm is a RuntimePermission object with the modifyThread parameter.
    security.checkPermission(shutdownPerm);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      for (Worker w : workers)
        // Verify that all worker threads have modifyThread permission
        security.checkAccess(w.thread);
    } finally{ mainLock.unlock(); }}}Copy the code

advanceRunState()

// targetState = SHUTDOWN
private void advanceRunState(int targetState) {
  for (;;) {
    int c = ctl.get();
    // Check whether the current thread pool status is >= SHUTDOWN. If not, modify the thread pool status through CAS
    if (runStateAtLeast(c, targetState) ||
        ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
      break; }}private static boolean runStateAtLeast(int c, int s) {
  return c >= s;
}
Copy the code

In this method, the thread pool state definition technique is used to determine whether the line is valid when the future pool state >= SHUTDOWN. TERMINATED (011) > TIDYING (010) > STOP (001) > SHUTDOWN (000) and TERMINATED (011) > TIDYING (010) > STOP (001) > SHUTDOWN (000) are TERMINATED (011) and TERMINATED (010) > STOP (001) > SHUTDOWN (000). Therefore, for different states, regardless of the number of active threads, the state of the thread pool always determines the size of the CTL value. That is, the CTL value in TERMINATED state > TIDYING state is valid.

interruptIdleWorkers()

private void interruptIdleWorkers(a) {
  interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      // Determine if the worker thread has been flagged for interruption, and if not, try to acquire the worker thread lock
      if(! t.isInterrupted() && w.tryLock()) {try {
          // Interrupt the thread
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally{ w.unlock(); }}// If onlyOne is true, at most one thread will be interrupted
      if (onlyOne)
        break; }}finally{ mainLock.unlock(); }}Copy the code

The rigid method will attempt to acquire the Worker’s lock, and the thread will be interrupted only if the lock is successfully acquired. This is also related to the previous statement that Worker inherits AQS but does not use CLH, which will be analyzed later.

The tryTerminate() method has been analyzed previously and will not be covered here.

6.2 shutdownNow ()

public List<Runnable> shutdownNow(a) {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // Verify that thread pool permissions are disabled
    checkShutdownAccess();
    // Change the thread pool state to STOP
    advanceRunState(STOP);
    // Interrupt all threads
    interruptWorkers();
    // Get a list of all pending tasks in the queue
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  // Try to close the thread pool
  tryTerminate();
  // Return to the task list
  return tasks;
}
Copy the code

This method is similar to shutdown() in that it encapsulates the core steps in several methods, of which checkShutdownAccess() and advanceRunState() are identical. The different methods are described below

interruptWorkers()

private void interruptWorkers(a) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // Walk through all the workers and cut off the Worker whenever it starts
    for (Worker w : workers)
      w.interruptIfStarted();
  } finally{ mainLock.unlock(); }}void interruptIfStarted(a) {
  Thread t;
  // state >= 0 indicates that the worker has started. If the worker starts and the holder thread is not null and the holder thread is not marked for interruption, the thread will be interrupted
  if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
    try {
      t.interrupt();
    } catch (SecurityException ignore) {
    }
  }
}
Copy the code

Instead of trying to acquire the Worker’s lock, this method interrupts the thread directly. The thread pool in the STOP state is not allowed to process tasks that are waiting in the task queue.

drainQueue()

DrainTo () usually works, but if the queue is delayed or otherwise the task cannot be moved by drainTo(), then the transfer is done by iterating
private List<Runnable> drainQueue(a) {... }Copy the code

Vii. Reasons for Worker’s inheritance of AQS

First, the conclusion — The Worker inherits AQS to use its state management function, and does not use CLH in AQS like ReentrantLock.

Let’s first look at the methods related to AQS in Worker:

// The parameter is unused. The name indicates that the parameter is unused
protected boolean tryAcquire(int unused) {
  // Change the state from 0 to 1 by CAS change
  if (compareAndSetState(0.1)) {
    // Set the current thread exclusive
    setExclusiveOwnerThread(Thread.currentThread());
    return true;
  }
  return false;
}
// This method is only used in runWorker()
public void lock(a)        { acquire(1); }
public boolean tryLock(a)  { return tryAcquire(1); }
Copy the code

TryAcquire in Worker only changes the state to 1, but the parameter is not used, so we can conclude that the state in Worker may be (0, 1). The initial state of -1 is not considered here to avoid confusion.

Looking at the lock() method, the only place the lock() method is called is before the worker thread is started in runWorker(). RunWorker () is called via run() in the Worker. As a task, Worker is only passed to its own Worker thread, so the run() method in Worker can only be called by its own Worker thread through start(), so runWorker() can only be called by its own Worker thread. The lock() method is also called only by a single thread. There is no case where multiple threads compete for the same lock, so there is no case where only one thread can acquire the lock and other waiting threads are added to the CLH queue in a multi-threaded environment. So Worker does not use CLH functionality.

This is a good indication that the tryAcquire() method does not use passed parameters, as the Worker can only exist in two states, either locked (not idle, state=1) or unlocked (idle, state=0). There is no need to pass parameters to set additional states.

final void runWorker(Worker w) {...try {
    while(task ! =null|| (task = getTask()) ! =null) {
      // The only place that is calledw.lock(); . }}}Copy the code

The above analysis shows that Worker does not use CLH function of AQS. How does Worker use the function of state management?

One step in the shutdown() method to shutdown the thread pool is to interrupt all idle Worker threads. Before all Worker threads are interrupted, it will determine whether Worker threads can be locked. Through tryLock() -> tryAcquire(), it will determine whether Worker status is 0. Only workers that can acquire locks will be interrupted. The Worker that can be locked is the idle Worker (state=0). While the name of the Worker table that cannot be locked has already performed the lock() method, at this point, the Worker is continuously obtaining the blocking queue task in the While loop and cannot be interrupted in the shutdown() method.

private void interruptIdleWorkers(boolean onlyOne) {...try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if(! t.isInterrupted() && w.tryLock()) { ... }}}}Copy the code

Therefore, Worker’s state management is actually to judge whether the Worker is idle by the value of state (0 or 1). If it is idle, it can be interrupted when the thread pool is closed; otherwise, the task in the blocking queue must be acquired in the while loop and executed until the task in the queue is empty. As shown below:

8. Rejection strategy

This chapter discusses only the four rejection policy handlers built into ThreadPoolExecutor.

8.1 CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {
  public CallerRunsPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // If the thread pool is not closed, the task is executed directly in the current thread
    if(! e.isShutdown()) { r.run(); }}}Copy the code

Execute the rejected task directly in the calling thread. Tasks are executed as long as the thread pool is in the RUNNING state. If the state is not RUNNING, the task is simply ignored, which also conforms to the behavior of thread pool state.

8.2 AbortPolicy

public static class AbortPolicy implements RejectedExecutionHandler {
  public AbortPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // Throw a reject exception
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from "+ e.toString()); }}Copy the code

If a task is rejected, a rejection exception is thrown.

8.3 DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {
  public DiscardPolicy(a) {}// Empty method, execute nothing
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code

Discard the task. If the reject method is empty, nothing is executed and the task is discarded.

8.4 DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  public DiscardOldestPolicy(a) {}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if(! e.isShutdown()) {// Get (remove) the queue head from the blocking queue,
      e.getQueue().poll();
      // Try executing the current task againe.execute(r); }}}Copy the code

Remove the first task to enter the queue (queue head) in the blocking queue, and then try execute() again to enqueue the current task. It’s a classic love affair strategy.

Secondary development of ThreadPoolExecutor

Having introduced the core principles of ThreadPoolExecutor, let’s take a look at how Vivo’s Own NexTask concurrency framework can play with thread pools and speed up business development and code execution.

NexTask abstracts common business patterns, algorithms, and scenarios and implements them as components. It provides a fast, lightweight, easy-to-use way to write concurrent programs quickly and empower development to a greater extent without the underlying technical details.

The NexTask architecture diagram is presented first, and then we examine in detail where ThreadPoolExecutor is used in the diagram.

// Executor part of the code:
public class Executor {...private static DefaultTaskProcessFactory taskProcessFactory =
    new DefaultTaskProcessFactory();
  // An external API is provided for users to quickly create task handlers
  public static TaskProcess getCommonTaskProcess(String name) {
        return TaskProcessManager.getTaskProcess(name, taskProcessFactory);
    }
  public static TaskProcess getTransactionalTaskProcess(String name) {
        returnTaskProcessManager.getTaskProcessTransactional(name, taskProcessFactory); }... }Copy the code

Executor is an interface that allows developers to use its easy-to-use API to quickly create a task processor, TaskProcessManager.

// TaskProcessManager
public class TaskProcessManager {
  // Cache map, < business name, task handler for that business >
  private static Map<String, TaskProcess> taskProcessContainer =
            newConcurrentHashMap<String, TaskProcess>(); . }Copy the code

TaskProcessManager holds a locally cached ConcurrentHashMap of task handlers, each mapped to a specific business name. When obtaining a task processor, obtain it from the cache by a specific service name. This ensures that task processing of each service is isolated from each other and prevents resource consumption caused by multiple creation and destruction of thread pools.

// TaskProcess
public class TaskProcess {
  / / thread pool
  private ExecutorService executor;
  // Thread pool initialization
  private void createThreadPool(a) {
        executor = new ThreadPoolExecutor(coreSize, poolSize, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2048), new DefaultThreadFactory(domain),
                new ThreadPoolExecutor.AbortPolicy());
    }
  // Multiple threads submit tasks for processing
  public <T> List<T> executeTask(List<TaskAction<T>> tasks) {
    int size = tasks.size();
    // Create a CountDownLatch with the same number of tasks to ensure that all tasks are processed together
    final CountDownLatch latch = new CountDownLatch(size);
    // Return result initialization
    List<Future<T>> futures = new ArrayList<Future<T>>(size);
    List<T> resultList = new ArrayList<T>(size);
    // Iterate over all tasks and commit to the thread pool
    for (final TaskAction<T> runnable : tasks) {
        Future<T> future = executor.submit(new Callable<T>() {
            @Override
            public T call(a) throws Exception {
          // Handle specific task logic
                try { return runnable.doInAction(); }
          // When processing is complete, countdownlatch-1
          finally{ latch.countDown(); }}}); futures.add(future); }try {
      // Wait for all tasks to complete
            latch.await(50, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.info("Executing Task is interrupt.");
        }
    // Encapsulate the result and return it
        for (Future<T> future : futures) {
            try {
                T result = future.get();// wait
                if(result ! =null) { resultList.add(result); }}catch (Exception e) {
                throw newRuntimeException(e); }}returnresultList; }... }Copy the code

Each TaskProcess holds a thread pool, and as you can see from the initialization of the pool, TaskProcess uses a bounded blocking queue with a maximum of 2048 tasks. Once the queue exceeds this number, it will reject the task and throw a reject exception.

The TaskProcess iterates through the list of tasks submitted by the user and submits them to the thread pool for processing using the submit() method, which is essentially the ThreadPoolExecutor#execute() method called. It just wraps the task into a RunnableFuture before calling it, which is what the FutureTask framework is all about, and doesn’t expand it.

TaskProcess creates a CountDownLatch each time a task is processed and executes countdownlatch.countdown () after the task is complete, ensuring that all tasks block on the current thread until the result is retrieved and returned.

Ten,

While the JDK provides developers with Executors tools and built-in thread pools, the use of those thread pools is limited to increasingly complex business scenarios. The official ali programming specification also recommends that developers not directly use the JDK’s built-in thread pool, but instead create thread pools using ThreadPoolExecutor based on their own business scenarios. Therefore, an understanding of the inner workings of ThreadPoolExecutor is also critical to becoming proficient with thread pools in everyday development.

This paper mainly explores the core principles of ThreadPoolExecutor, introduces its construction method and the detailed significance of each construction parameter, as well as the conversion method of thread pool core CTL parameter. I then delve into the ThreadPoolExecutor source code at length to introduce the start and close process of the thread pool, the core built-in Worker class, and so on. There are other methods of ThreadPoolExecutor that are not covered in this article, but you can read the source code on your own to help.

Author: Xu Weiteng, Vivo Internet Server Team