ThreadPoolExecutor is a thread pool management tool class that provides task management, thread scheduling, and related hook methods to control thread pool state.

1. Method description

The main methods of task management are as follows:

public void execute(Runnable command); public <T> Future<T> submit(Callable<T> task); public <T> Future<T> submit(Runnable task, T result); public Future<? > submit(Runnable task); public void shutdown(); public List<Runnable> shutdownNow();Copy the code

In the above methods, execute() and submit() immediately invoke the thread to execute the task if there is an idle thread. The difference is that execute() ignores the execution result of the task, while submit() can obtain the result. ThreadPoolExecutor also provides the shutdown() and shutdownNow() methods to shutdown the thread pool. The shutdownNow() method directly closes the thread pool and returns the tasks from the task queue exported to a list.

In addition to the above methods for performing tasks, ThreadPoolExecutor provides the following hook methods:

protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
protected void terminated();
Copy the code

In ThreadPoolExecutor these methods are empty by default. BeforeExecute () is called before each task, afterExecute() is called after each task, and terminated() is called when the thread pool is terminated. The way to use these methods is to declare a subclass that inherits ThreadPoolExecutor, override the custom hook methods in the subclass, and then use the subclass instance when creating the thread pool.

2. Task scheduling

A. Related parameters

Instantiation of ThreadPoolExecutor takes the following important parameters:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
                          TimeUnit unit, BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory, RejectedExecutionHandler handler);
Copy the code
  • CorePoolSize: the number of core threads in the thread pool;
  • MaximumPoolSize: the maximum number of threads that a thread pool can create;
  • keepAliveTime: The core thread is destroyed when the number of threads exceeds the number specified by corePoolSize and the idle thread has been idle for as long as the current parameter specifies. If allowCoreThreadTimeOut(Boolean Value) allows the core thread to expire, Then the policy also works for core threads;
  • Unit: specifies the keepAliveTime unit. It can be milliseconds, seconds, minutes, hours, etc.
  • WorkQueue: a queue that stores unexecuted tasks.
  • ThreadFactory: factory for creating threads, if not specified the default threadFactory is used;
  • Handler: Specifies the processing policy for newly added tasks when the task queue is full and there are no available threads to execute the tasks;
B. Scheduling policies

When a thread pool is initialized, there are no active threads in the pool for users to execute tasks. When a new task arrives, its main execution tasks are as follows according to the configured parameters:

  • If the number of threads in the thread pool is less than the number specified by corePoolSize, a new thread will be created to execute each task, regardless of whether there are idle threads in the thread pool.
  • When the number of threads specified by corePoolSize is reached, that is, when all core threads are executing tasks, new tasks will be stored in the task queue specified by workQueue.
  • When all core threads are executing tasks and the task queue is full, a new task will be created in the thread pool to execute the task.
  • If all threads (the number of threads specified by maximumPoolSize) are executing and the task queue is full, new tasks are processed in the manner specified by handler.
C. Pay attention to scheduling policies
  • In step 2, a new thread is created to execute the task when the current core thread is executing the task and the task queue is full. Note that the total number of tasks that need to be executed when the new thread is created is (corePoolSize + workQueueSize). There is no single corePoolSize task;
  • In step 3, there are three main types of workQueue: ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, ArrayBlockingQueue, ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, ArrayBlockingQueue, ArrayBlockingQueue, ArrayBlockingQueue, SynchronousQueue, ArrayBlockingQueue ArrayBlockingQueue specifies the size of the queue. When the queue is full, a new thread is created to execute the task. ArrayBlockingQueue, if bounded, is similar to ArrayBlockingQueue. If it does not specify a limit, then it can theoretically store an unlimited number of tasks, but can actually store integer.max_value tasks. Since LinkedBlockingQueue is never full of tasks, For the SynchronousQueue, there is no internal structure to store tasks. When a task is added to the queue, the current thread and subsequent threads are blocked. The current thread is not released until a thread pulls a task from the queue, so if the thread pool uses the queue, corePoolSize will be small and maxPoolSize will be large because the queue is suitable for large and short tasks.
  • In step 4, DiscardPolicy and DiscardOldestPolicy are not used in conjunction with SynchronousQueue because the task is discarded when the synchronization queue blocks it. AbortPolicy needs to be used with care because it throws an exception if the queue is full. For CallerRunsPolicy, since the calling thread is used to execute the current task when a new task arrives, its effect on the server response needs to be considered when using it. It also needs to be noted that compared with the other policies, this policy does not discard the task when the task arrives. If incoming tasks fill up the queue and you have to use calling threads to perform tasks, then the thread pool is not designed properly, and if left unattended, all calling threads can be blocked by tasks that need to be executed, causing problems for the server.

3. Source code explanation

A. Main attributes
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 32 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111 11111111 11111111 11111111 private static final int RUNNING = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 00000000 00000000 00000000 00000000 private static final int STOP = 1 << COUNT_BITS; // 00100000 00000000 00000000 00000000 private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000 private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000 00000000Copy the code

Since ThreadPoolExecutor needs to manage multiple states and keep track of the number of threads currently executing tasks, it can be complicated to manage concurrent updates if you use multiple variables, ThreadPoolExecutor uses a variable CTL of type AtomicInteger to store all major information. CTL is a 32-bit integer with an initial value of 0. The highest bits of CTL are used to store information about the current thread pool. The TERMINATED bits are RUNNING, SHUTDOWN, STOP, TIDING, and TERMINATED. The value information for each of these states is shown in the code above. It is important to note that in ThreadPoolExecutor, these states increase in value from small to large, and the state flow is also downward, which provides a convenient way to determine the state information. If you need to check whether the thread pool is in the SHUTDOWN state, you only need to check whether the value representing the status bit is equal to SHUTDOWN. In CTL, except for the top three bits, which represent the state, the remaining bits specify the number of threads executing tasks in the current thread pool. Here’s how to manipulate the CTL attribute:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
  return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
  return c >= s;
}

private static boolean isRunning(int c) {
  return c < SHUTDOWN;
}

private boolean compareAndIncrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect + 1);
}

private boolean compareAndDecrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect - 1);
}
Copy the code
  • RunStateOf (int C): Used to get the state of the current thread pool. C is the CTL property value of the current thread pool.
  • WorkerCountOf (int c): used to get the number of working threads in the current thread pool. C is the CTL property value of the current thread pool.
  • CtlOf (int RS, int WC): rs represents the working state of the current thread, and WC represents the number of working threads. This method is used to assemble these two parameters into a CTL attribute value.
  • RunStateLessThan (int c, int s): Checks whether the current thread pool state has not reached the specified state.
  • RunStateAtLeast (int C, int s): Determines whether the current thread pool is in at least one state.
  • IsRunning (int c): used to check whether the current thread pool isRunning properly.
  • CompareAndIncrementWorkerCount (int expect) : to increase the number of the current thread pool of worker threads value;
  • CompareAndDecrementWorkerCount (int expect) : reduce the amount of the current thread pool of worker threads.
B. Main methods

For the execute() and submit() methods of the thread pool, the underlying submit() method encapsulates the incoming task as a FutureTask. Since the FutureTask implements the Runnable interface, it can be executed as a task. This is where the encapsulated FutureTask object is passed to the execute() method. The execute() method is implemented as follows:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (workerCountOf(c) < corePoolSize) { If (addWorker(command, true)) return; c = ctl.get(); } // If the thread pool fails to be added, update the current thread pool state} // If the thread pool fails to be added, update the current thread pool state. If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // Double verify that the thread pool is in the correct state if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); // If there are no threads in the pool, create a new thread to perform the added task. addWorker(command, false)) reject(command); // The thread pool is at least in the SHUTDOWN state.Copy the code

In the execute () method, it first determines whether a thread pool worker thread count is less than the core number of threads, is the core is to create a thread to perform tasks, add failure or job number greater than or equal to the core number of threads, thread, add tasks to the task queue, add after the success will be double validation to ensure that the current thread pool is in the correct position, And make sure there are currently available threads to execute the newly added tasks. Therefore, it can be seen that the core method for the implementation of execute() method is the addWorker() method, and the implementation of addWorker() method is as follows:

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); If (rs >= SHUTDOWN &&!) if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); Determine whether work / / the number of threads is greater than the maximum number of threads can be recorded, or worker threads more than the specified core thread or the maximum number of threads the if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; // This step indicates that the current thread pool is in the RUNNING state, or there are tasks in the task queue, and the number of worker threads does not exceed the specified number of threads. Failed, repeat the above steps to add the if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); if (runStateOf(c) ! = rs) continue retry; Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // Create a worker object final Thread t = w.htread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // Recheck the thread pool state, or check whether the current state is SHUTDOWN and firstTask is empty, This suggests that the task queue at this time is not null if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {if (t.i sAlive ()) throw new IllegalThreadStateException(); workers.add(w); Int s = worker.size (); if (s > largestPoolSize) largestPoolSize = s; // Update the maximum number of threads used workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // After the worker object is successfully created, the worker is called to execute the task workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

In the addWorker() method, it first checks whether the current thread pool is in the RUNNING state or SHUTDOWN state, but there are still tasks in the task queue, then it creates a new Worker object and adds it to the Worker object collection. Then call the thread maintained by the worker object to execute the task. The following is the implementation code of the worker object:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; final Thread thread; // The thread executing the task in the current worker Runnable firstTask; // The first task that needs to be executed is volatile long completedTasks; Worker(Runnable firstTask) {// Default is -1, so the current Worker's state does not change without calling the current Worker's run() method, and other threads cannot use the current Worker to perform tasks. In the runWorker() method called by the run() method, the unlock() method is called to keep the current worker in a normal state setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() {runWorker(this); } protected Boolean isHeldExclusively() {return getState()! = 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); Void interruptIfStarted() {Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code

In the case of a worker object, it primarily maintains a worker thread for performing tasks. The worker object inherited AbstractQueuedSynchronizer, used to control the current state of the workers to work for, and it also implements the Runnable interface, the main task execution encapsulation to run () method. Here’s an implementation of the runWorker() method:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // Reset the state of the Worker object Boolean completedAbruptly = true; Try {// First executes the task in the worker thread, then loops to fetch the task from the task queue and executes while (task! = null || (task = getTask()) ! = null) { w.lock(); // Check the status of the current thread pool, If the Thread pool is terminated or in Thread pool and the current Thread has been interrupted the if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Thrown = null; try { task.run(); } catch (RuntimeException x) {thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); }} finally {task = null; // reset the worker's initial task w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

As you can see, in the runWorker() method, it first performs the initialization task of the worker object, and when it is finished, it continuously fetches the task execution in the task queue through an infinite loop. Here is the source of the getTask() method:

private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // check whether the current thread is stopped or SHUTDOWN, and the work queue is empty. Task is not return the if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {decrementWorkerCount (); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // The number of worker threads is greater than the maximum number of allowed threads, or the thread cannot obtain new tasks from the work queue within a specified period of time. The destruction of the current thread if ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {if (compareAndDecrementWorkerCount(c)) return null; continue; } try {Runnable r = timed? Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

As you can see, the getTask method first determines whether the current thread pool is in the STOP state or SHUTDOWN state and the task queue is empty. If so, it does not return the task. Otherwise, it obtains the task execution from the task queue according to relevant parameters.

The other most important method in ThreadPoolExecutor is the shutdown() method. Here is the main code for the shutdown() method:

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); AdvanceRunState (SHUTDOWN); // Check the thread state control permission advanceRunState(SHUTDOWN); // Update the current thread pool status to SHUTDOWN interruptIdleWorkers(); // Interrupt idle workers onShutdown(); } finally {mainlock. unlock(); } tryTerminate(); }Copy the code

The shutdown() method first checks if the current thread has permission to change the thread state, then changes the current thread pool state to shutdown, and calls interruptIdleWorkers() to interrupt all idle threads. Finally, the tryTerminate() method is called to attempt to change the state of the current thread pool from SHUTDOWN to TERMINATED. Here the interruptIdleWorkers() method eventually calls its overloaded interruptIdleWorkers(Boolean) method, which reads as follows:

private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}
Copy the code

As you can see, this method iterates through all worker objects and terminates them if they are idle. For working threads, since the state of the current thread pool has been set to shutdown in the shutdown() method, the working thread will automatically destroy all the tasks in the task queue.

ThreadPoolExecutor (ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor)