Why thread pools

To answer this question, think about the simplest life cycle of a thread: thread creation -> thread execution (including waiting) -> thread destruction. Set the creation time of the thread to T1, the execution time of the thread to T2, and the destruction time to T3. The total thread lifecycle consumption time is T = T1 + T2 + T3. You want to improve the throughput of a service, is definitely making thread real mission time of the more the better, so the easiest way is to remove the t1, t3, behind the beginning of the creation has been used to create the thread, so you need not again after you destroy the task to create again, at the same time try to reduce the waiting time of t2, make less CPU to switch back and forth.

The overhead of thread creation: allocating stack space to threads (which can be configured by JVM-XSS), scheduling, and performing memory paging during thread switches, emptying the CPU’s cache, and re-reading information from memory when switching back, breaking data locality.

Destruction of thread creation: reclaim stack space, list scheduling, CPU cache emptied.

What is a thread pool

A thread usage pattern. Too many lines will bring scheduling overhead, which will affect cache locality and integrity.

Advantages of thread pools

  • Improve manageability of threads: When threads are scarce resources, if they are created without limit, they will not only consume system resources, but also cause resource scheduling imbalance due to unreasonable distribution of threads, which reduces system stability. Thread pools allow for uniform allocation, tuning, and monitoring.
  • This ensures that the kernel is fully utilized and prevents overscheduling.
  • Reduces the cost of creating and destroying threads – time, space.

The JDK’s ThreadPoolExecutor works like this:

Let’s take a closer look at the implementation of thread pools by reading JDK source code.

ThreadPoolExecutor basic structure

Let’s briefly describe it from top to bottom:

  • The Executor. This interface is only one the execute method, and receives the parameters for the Runnable, to the subclasses provide party a specification, only need to submit the task, the specific internal task running mechanism including is the use of the calling thread or how using a thread pool, task scheduling, threads in the pool, how to manage were conducted in accordance with the subclass implementation.
  • ExecutorService: Also an interface that inherits from executors, the unused ExecutorService should be shut down in order to reclaim its resources, mainly providing ways to manage termination, and ways to generate futures that track the progress of one or more asynchronous tasks.
  • AbstractExecutorService: An abstract class that provides a default implementation of the ExecutorService execution methods. This class uses newTaskFor to return the RunnableFuture implementation of the submit, invokeAny, and invokeAll methods, which default to the FutureTask class provided in the package.
  • ThreadPoolExecutor: An implementation class of Executor that uses a thread pool to perform each task. This class provides a number of tunable parameters and extendable hooks.

The class structure of ThreadPoolExecutor is relatively simple and can be used for design learning. Now that we have some understanding of the various classes/interfaces in the structure, we can look at the reference to FutureTask in the description before continuing our understanding of ThreadPollExecutor.

FutureTask

  • Future: An interface that represents the result of asynchronous computation. Provides methods to check that a calculation is complete, wait for it to complete, and retrieve the result of the calculation. Cancel method.
  • RunnableFuture: An interface, a Future that can be run. The success of the run method results in the completion of Futrure and allows access to its results.
  • Futureask: Implementation class of RunnableFuture, cancelable asynchronous computation.

get

public V get() throws InterruptedException, ExecutionException { int s = state; (s <= COMPLETING) // Blocking until the task is completed s = awaitDone(false, 0L); Return report(s); }Copy the code

As you can see, the key above is to wait for state to become complete and after, so it must be changed in the run method, let’s take a look

run

public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //FutureTask internal hold callable Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code

You can see that the run method calls the Callable’s call method inside. Finally, if the execution succeeds, set the result. The other thing you can see here is that the Runnable run method is overridden, so it’s very neat to have the subsequent ThreadPoolExecutor execute parameter be Runnable regardless of whether subnmit submits a Callable or a Runnable — and that’s important.

ThreadPoolExecutor fields

Private final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); Private static final int COUNT_BITS = integer.size -3; private static final int COUNT_BITS = integer.size -3; // 2^29-1 is the maximum capacity, and COUNT_BITS applies the lower 29 bits of Integer to store the number of threads, Private static final int CAPACITY = (1 << COUNT_BITS) -1; private static final int CAPACITY = (1 << COUNT_BITS) -1; // The binary storage of RUNNING is 1110 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 // Can accept newly submitted tasks and can also process tasks in the blocking queue. private static final int RUNNING = -1 << COUNT_BITS; // The binary storage of SHUTDOWN is 0000 0000 0000 0000 0000 0000 0000 0000 0000 // The state of SHUTDOWN is not accepted for new submitted tasks, but can continue to process the saved tasks in the blocking queue. private static final int SHUTDOWN = 0 << COUNT_BITS; // Can not accept new tasks, and does not process queued tasks, will interrupt the processing task thread. private static final int STOP = 1 << COUNT_BITS; // All tasks have terminated, and workerCount is 0. private static final int TIDYING = 2 << COUNT_BITS; // Thread pool TERMINATED private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } / / get the CTL being given private static int ctlOf (int the rs, int wc) {return rs | wc. } /* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */ private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; Increasing} / * * * try to spin workerCount part of CTL * / private Boolean compareAndIncrementWorkerCount (int) expect {return ctl.compareAndSet(expect, expect + 1); Diminishing} / * * * try to spin workerCount part of CTL * / private Boolean compareAndDecrementWorkerCount (int) expect {return ctl.compareAndSet(expect, expect - 1); } /** * decrements the workerCount field of the CTL. This is called only when the thread terminates abruptly (see processWorkerExit). Other decrements are performed in getTask. */ private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } /** * the queue used to hold the task and pass it to the worker thread. */ private final BlockingQueue<Runnable> workQueue; /** * Lock access to workers collection and related statistics. While some kind of concurrent collection can be used, it turns out that it's usually best to use locks. * One reason is a series of interrupts of the worker thread to avoid unnecessary sudden interruptions, especially during shutdown. Otherwise, the exiting thread interrupts those threads that have not been interrupted. * It also simplifies related statistics such as maximum pool size. * We also hold mainLock locks at Shutdown and shutdownNow to ensure the wokers collection is stable, while checking for permitted interrupts and actual interrupts separately. * In a nutshell: The reason for not using concurrent sets is that there are critical sections where it is convenient to use separate locks. */ private final ReentrantLock mainLock = new ReentrantLock(); /** * Set contains all the worker threads in the pool. You can only access it if you own mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Support wait conditions of awaitTermination. */ private final Condition termination = mainLock.newCondition(); /** * Tracks the maximum pool size. You can only access it if you own mainLock. */ private int largestPoolSize; /** * Complete the task counter. Updates only when the worker thread terminates. You can only access it if you own mainLock. */ private long completedTaskCount; private volatile ThreadFactory threadFactory; /** * Reject policy when saturated or closed. */ private volatile RejectedExecutionHandler handler; /** * The timeout for idle threads to wait for work (nanoseconds). * Threads use this timeout when there is more than corePoolSize or allowCoreThreadTimeOut. Otherwise they will wait forever for a new job. */ private volatile long keepAliveTime; /** * If false (the default), the core thread remains active even when idle. * If true, the core thread uses keepAliveTime timeout to wait for work. */ private volatile boolean allowCoreThreadTimeOut; /** *Core pool size is the minimum number of worker threads that can remain active (no timeouts allowed, etc.) unless allowCoreThreadTimeOut is set, in which case the minimum value is zero. */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum value is internally limited by CAPACITY. */ private volatile int maximumPoolSize; /** * Default reject policy */ private static Final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** * Shutdown and Shutdownow callers require permission */ private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); /* Private final AccessControlContext acc; /* Private final AccessControlContext acc;Copy the code

All of the above fields have been explained in the comments, so let’s talk about the main fields here.

ctl

Thread pool state and number of threads to do or to calculate, since the state is fixed value can be done. You can see that AutomicInteger uses spin +volatile to ensure that increments and decays are thread-safe. The clever use of a single value for two pieces of information, three bits higher to indicate the running state of the thread pool and 29 bits lower (more than 500 million) to hold the number of worker threads.

Thread pool running status

  • RUNNING: Can accept newly submitted tasks and also process tasks in a blocking queue.
  • SHUTDOWN: The state is closed. New submitted tasks are no longer accepted, but saved tasks in the blocking queue can continue to be processed.
  • STOP: new tasks cannot be accepted and queued tasks are not processed. The processing thread is interrupted.
  • TIDYING: All tasks terminated, workerCount is 0.
  • TERMINATED: The thread pool is TERMINATED completely.

mainLock

Locks access to the Wokers collection and related statistics.

Handler-reject policy

The default is AbortPolicy.

  • ThreadPoolExecutor. AbortPolicy: the default is also commonly used refusal strategies (because of the general configuration value is the server can withstand the concurrency value), when the task cannot be submitted directly to throw RejectedExecutionException rejected.
  • (ThreadPoolExecutor. CallerRunsPolicy: by the calling thread is ready to submit asynchronous task thread) processing. This ensures that all tasks are completed, but the interface responds slowly.
  • ThreadPoolExecutor. DiscardPolicy: abandon the task, but does not throw an exception. Not for general business.
  • ThreadPoolExecutor. DiscardOldestPolicy: abandon the queue in front of the task, and then resubmit the rejected tasks. Generally not used because tasks need to be classified, rethrows that can be thrown, and there are no asynchronous tasks that can be thrown.

submit

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
Copy the code

Commit methods. You don’t see much here, but you can see that it corresponds to the description of Executor from the previous section, “Allow separation of tasks from submit.” Submit here is only responsible for task encapsulation, and subsequent operations include task scheduling, task buffering, task application, task rejection and so on. Execute is implemented in ThreadPollExecutor so let’s look at the newTaskFor method inside.

NewTaskFor – Creates a new task

/** * Return RunnableFuture based on the given runnable and default values. * * @param Runnable The runnable task being wrapped * @param Value the default value returned * @param <T> value type * @return RunnableFuture, The runtime runs the underlying runnable, and the Future produces the given value as a result and provides the cancellation of the underlying task. * @since 1.6 */ protected <T> RunnableFuture<T> newTaskFor(Runnable Runnable, T value) { return new FutureTask<T>(runnable, value); }Copy the code

You can see that this method also returns FutureTask because it gives a value, but Runnable can’t return a method, so the value is returned the way it was.

/** * The constructor passes Runnable as an execution function. Result is a fixed value because Runnable does not return a result like Callable, but the return of a fixed value will wait for Runnable to complete. */ public FutureTask(Runnable runnable, Callable = Executors. Callable (runnable, result); this.state = NEW; // ensure visibility of callable }Copy the code
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
Copy the code

It’s already clear at this point that RunnableAdapter decorates Runnable as a Callable, and the call method returns the value passed in.

static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; }}Copy the code

It can be seen that S, and the relevant submit of Callable will not be described here.

execute

Perform a given task at some point in the future. Tasks can be executed in a new thread or an existing pool thread. If a task cannot be submitted because the actuator is shut down or has reached its capacity, the task will be handled by the current RejectedExecutionHandler.

The execution of the task is divided into three steps:

  1. If fewer threads are running than corePoolSize, try to start a new thread with the given command as its first task. The call to addWorker atomically checks the health and workerCount, preventing error alarms by returning false – adding threads when they should not be added.
  2. If a task can be queued successfully, then we still need to check again to see if a thread should be added (since an existing thread has died since the last check) or if the pool is closed since entering this method. Therefore, we recheck the state, if stopped, roll back the queue if necessary, and if not, start a new thread.
  3. If the task cannot be queued, try adding a new thread. If it fails, we know we’ve been shut down or saturated, so reject the task.
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {// If (workerCountOf(c) < corePoolSize) {// If (addWorker(command, true)) return; C = ctl.get(); } // If the number of core threads has been exceeded, the current thread pool is determined to be running and can be added to the blocking queue until the thread is released. If (isRunning(c) &&workqueue.offer (command)) {// recheck = ctl.get(); // If the queue is not currently running then roll back the queue, using the rejection policy if (! isRunning(recheck) && remove(command)) reject(command); // If you are running but the worker thread is 0, you need to add non-core threads. else if (workerCountOf(recheck) == 0) addWorker(null, false); } // If the queue is not normal then try to add non-core thread, if the queue fails long reject policy else if (! addWorker(command, false)) reject(command); }Copy the code

The source code annotation step description is more clear, it is recommended to directly look at the source code steps, some of the article’s figure is not too accurate. The general process is:

  1. See if the number of core threads exceeds, if not, add core threads directly to perform the task.
  2. If it exceeds, queue it up, and add non-core threads to execute the task directly.
  3. If all previous steps fail or the intermediate status check fails, simply reject.

Task execution is triggered when a new thread is added, so addWorker involves adding a new thread and executing the task. Let’s look at the addWorker method in detail.

AddWorker – Adds a worker thread

Checks the state of the current thread pool and whether new worker threads can be added at a given boundary (core number or maximum thread number). If so, the number of worker threads is adjusted accordingly and, if possible, a new worker thread is created and started running firstTask as its firstTask. This method returns false if the thread pool has been stopped or meets the closing criteria. It also returns false if the thread factory failed to create a thread when requested. If the thread creation fails, either because the thread factory returns NULL, or because of an exception (usually an OutOfMemoryError in Thread.start ()), we roll back completely.

Private Boolean addWorker(Runnable firstTask, Boolean core) {// Add workCount retry: for (;); { int c = ctl.get(); int rs = runStateOf(c); // Check whether the queue is empty only when necessary // If SHUTDOWN is greater than or equal to, the task is not received. If (rs >= SHUTDOWN &&!) if (rs >= SHUTDOWN &&!) if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); / / to see if the worker thread is greater than or equal to the CAPACITY (note that there is greater than or equal to, can prevent super multithreaded case, so that only a small amount of outside), or if they are not beyond the core thread will determine whether greater than the maximum number of threads or the if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / CAS work increases the number of threads the if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); CTL if (runStateOf(c)! = rs) continue retry; }} Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; // create a new thread w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Take the lock and check again. Exit when ThreadFactory fails or closes before acquiring the lock. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new  IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; }}} finally {// If threadFactory fails to create a thread try rollback and try terminate if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

The above describes adding new woker tasks if there are new tasks to execute directly. Let’s look at how the task is performed. Now that we know how to add tasks and how to add worker threads, let’s move on to how tasks make assignments work.

RunWorker – Performs tasks

public void run() {
    runWorker(this);
}
Copy the code

The main Woker run loop. Repeatedly fetching tasks from the queue and executing them while dealing with many problems:

  1. We can start with the original mission, in which case we don’t need to get the first mission. Otherwise, we can get the task from getTask as long as the thread pool is running. If null is returned, the worker process exits due to a change in the thread pool state or configuration parameter. Other exits result from an exception thrown in external code, in which case CompletedWeight remains unchanged, which typically causes processWorkerExit to replace the thread.
  2. Before running any task, we get the lock to prevent other pools from breaking while the task is executing, and then we make sure that thread is not set to break unless the pool is stopped.
  3. BeforeExecute is called before each task runs, which can throw an exception, in which case we cause the thread to die without processing the task (using CompletedTruntly True to interrupt the loop).
  4. Assuming beforeExecute completes properly, we run the task and collect any exceptions it throws to send to afterExecute. We handle RuntimeException, Error (both of which are guaranteed to catch), and any Throwables, respectively. Since we cannot rethrow Throwables in runnable. run, we wrap them in errors (to the thread’s UncaughtExceptionHandler) on exit. Any exception causes the thread to die.
  5. After task.run completes, we call afterExecute, which may also raise an exception, which also causes the thread to die. According to JLS Sec 14.20, even if task.run throws an exception, the exception takes effect. The net effect of the exception mechanism is that afterExecute and the thread’s UncaughtExceptionHandler have as accurate information as possible, and we can provide information about any problems the user’s code is experiencing.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); Boolean completedAbruptly = true; // Allow interrupts - only when running the same woker will interrupt. try { while (task ! = null || (task = getTask()) ! = null) {// to start execution, w.lock(); // If the pool is stopping, make sure the thread is interrupted; // If not, make sure the thread is not interrupted. This requires a recheck in the second case to process the Shutdownow Race while clearing the interrupt. -- Thread.interrupted() May change the status of the Thread pool. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// Empty in this class is an extensible hook beforeExecute(wt, task); Throwable thrown = null; Try {// Execute task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// Empty in class, is an extensible hook afterExecute(task, thrown); } } finally { task = null; W.com pletedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

Here you can see that the runWorker will continue to execute tasks using the current thread until there are no more tasks in the queue or an exception occurs.

Let’s look first at the getTask method, and then at the processWorkerExit method, which will follow if no task is executed or an exception occurs.

GetTask – Get the task (keepAliveTime works here)

Based on the current configuration, a task is blocked or timed to wait, and null is returned if the worker process must be exited for any of the following reasons (causing the runWorker to exit the while and the current thread to be destroyed later) :

  1. There are more worker threads than maximumPoolSize (due to calls to setMaximumPoolSize).
  2. The thread pool has stopped.
  3. The thread pool is closed and the queue is empty.
  4. The worker is waiting for a timeout during a mission, a timeout of worker will be terminated before and after waiting for (ie allowCoreThreadTimeOut | | workerCount > corePoolSize), if the queue is not empty, then the worker thread is not the last thread in the pool.
private Runnable getTask() { boolean timedOut = false; // Did the last poll run out of time? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // reduce the number of worker threads only if the thread pool is SHUTDOWN and the queue is empty, Or after the STOP state if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {decrementWorkerCount (); return null; } int wc = workerCountOf(c); // Whether the poll timeout occurs. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; If wc is greater than the maximum number of threads allowed or the thread is allowed to timeout, then the thread is terminated and the wc is decremented. If ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {/ / decline using external to spin a for loop, Success will return null if (compareAndDecrementWorkerCount (c)) return null; continue; } try {Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

You can see:

  • If the core thread does not set timeout, then when wc is greater than the number of core threads and there are no tasks in the queue timeout automatically returns NULL, causing the executing thread to exit.

    The timeout thread does not matter whether it is a later non-core thread or a previously created core thread. The worker thread itself does not distinguish whether it is a core or not. The purpose of the core is only to remain in the pool when no timeout is set.

  • If the core thread sets a timeout, then any time the thread retrieves a task timeout will cause the thread to exit.

ProcessWorkerExit – Handles worker thread exit

Clean up and count for dying worker threads. Called only from the worker thread. Unless “completedAbruptly” is set, assume that workerCount has been adjusted to consider exit. This method removes threads from the worker thread set, which may then terminate the thread pool or replace the worker thread if it exits due to a user task exception, or if fewer worker threads are running than corePoolSize, or if the queue is not empty but has no worker threads.

Private void processWorkerExit(Worker w, Boolean completedAbruptly) {if (completedAbruptly) So workerCount doesn't adjust decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Add the number of tasks completed by the current thread. completedTaskCount += w.completedTasks; / / remove workers. Remove (w); } finally { mainLock.unlock(); } // If (thread pool TERMINATED and queue TERMINATED) or (thread pool TERMINATED and queue TERMINATED), it becomes TERMINATED. int c = ctl.get(); If (runStateLessThan(c, STOP)) {if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } // Add non-core thread addWorker(null, false); }}Copy the code

The number of non-core threads increases when the worker thread is smaller than the core thread and when the exception exits. Why? Since there are no missions left at this point, it’s wise to add non-core. If the number of core threads is reached, the number of core threads will be added.

Task buffering – Blocking queue selection

reference

The thread pool

Overhead of thread creation versus thread pools

Implementation principle of Java thread pool and its practice in Meituan business