1 Thread pool: why? Mainly to solve

2.1 Number of core threads // Less than the number of core threads: A new thread is created when the task is submitted, even if other threads are idle. When a new task is submitted in method {@link #execute(Runnable)}, and fewer than corePoolSize threads are running, A new thread is created to handle the request, even if other worker threads are idle. // A new thread is created to handle the request even if other worker threads are idle. Creates a thread; If there are more than corePoolSize but less than maximumPoolSize threads running, A new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, You create a fixed-size thread pool // Set the maximum number of threads to be essentially unbounded, such as the Integer maximum, By setting maximumPoolSize to an essentially unbounded value such as {@code integr. MAX_VALUE}, You allow the pool to accommodate an arbitrary number of concurrent tasks. core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. // By default, the core thread is created only when a new task arrives. By default, even core threads are initially created and started only when new tasks arrive. but this can be overridden dynamically using method {@link #prestartCoreThread} or {@link #prestartAllCoreThreads}. Starts all core threads, causing them to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed. Why is that? For example, some components of Tomcat use thread pools that are initialized: call prestartAllCoreThreads to prestart the core threads;

2.2 Source code parsing – number of core threads

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, Try to * start a new thread with the given command as its first. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, * * 2. If a task can be successfully queued, then we still need The thread pool may be closed or existing ones died since last checking * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state  and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, We know we are saturated or saturated * and so reject the task. */ int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {// < corePoolSize addWorker if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); if (! IsRunning (recheck) && remove(command)) // Pool disables the task reject(command); Else if (workerCountOf(recheck) == 0) // If the worker thread is empty, add non-core worker addWorker(null, false); } else if (! AddWorker (command, false)) // Reject (command); // Create non-core thread failed to reject policy}Copy the code

2.3 the pool CTL

     ctl = new AtomicInteger(ctlOf(RUNNING, 0));

Lifecycle Control Indicates the lifecycle control status

* RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don’t accept new tasks, but process queued tasks * STOP: Don’t accept new tasks, don’t process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed Threads waiting in awaitTermination() will return when the state reaches TERMINATED. Contains workerCount number of thread pool worker threads, runState State of the thread pool (2^29)-1 number of up to 500 million threads, workerCount may temporarily differ from the actual number of threads.

111-1 1111 1111 1111 1111 | | 1111 | 1111 1111 high three said state [RUNNING] 111-1 000 SHUTDOWN 0 001 STOP 】 【 1 】 【 010 TIDYING 】 2 【011 TERMINATED】 3 The workerCount is The number of workers that have been permitted to start and not permitted to stop 2.4 Thread Pool startup Parameter – Thread factory Why? The factory class used for thread creation. The default thread is a non-daemon thread (the main method is a non-daemon thread. Daemon threads, like GC threads, terminate immediately when the main thread terminates, so try finally methods are not executed. Thread priority is the default value. The default thread factory. In practice, The project uses a custom thread pool: can alter The thread’s name, thread group, priority, daemon status. You can view online logs to locate problems. The executor will continue, but might not be able to execute any tasks. But may not be able to perform any tasks) Threads should possess the “modifyThread”, If you do not have this permission, service may be degraded and termination is possible but not completed.

2.5 Daemon threads vs. non-daemon threads

Non-daemon threads are also called “user” threads. Priority, grouping, and class loaders are all integrated child and parent threads by default; / / thread is actually an object that contains the virtual machine stack, stack method, the task of holding, own id (lock on the synchronizer), stackSize, / / even if only one not a daemon thread (user) is active, the JVM will not exit / / to create a new thread, it will inherit the parent daemon status. You can make a thread daemon thread or a non-daemon thread by calling setDaemon (Boolean). However, this method should be called before starting the thread. // When the Java virtual machine starts, there is only one non-daemon thread that calls the main () method of the specified class. This is why by default all child threads created by the main thread are non-daemons, because the main thread is non-daemons. This is why the JVM does not exit if there is a default thread pool in the main method. 2.6 The thread pool startup parameter keep-alive times

If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}). Implementation of thread pool use to resource use scaling. (By default, The keep-alive policy applies only when there are more than corePoolSize threads) But method {@link #allowCoreThreadTimeOut(boolean)} can be used to apply this time-out policy to core threads as well, So long as the keepAliveTime value is non-zero. // This parameter can also be applied to the number of core threads if allowCoreThreadTimeOut is set. The pool size may drop to 0, and the main (worker) loop stops, waiting for the task to arrive. // Each worker spins (main (worker thread) loops, Extracted from clogging in the queue tasks) or Java. Util. Concurrent. ThreadPoolExecutor# getTask of Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; Mark whether can be workers subject to culling, whether can quit.

Keep-alive times

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; If ((wc > maximumPoolSize | | (timed && timedOut)) / / (greater than the maximum core or already timeout) && (more than one worker thread count or task queue is empty) should be an end to this thread && (wc > 1 | | workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // workerCount -- return null; // 1 May return NULL 2 non-core shrink continue in response to interrupted shrink; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) {timedOut = false; // In response to an interrupt during a wait, the next loop tries to reclaim the threadCopy the code

TimedOut =true && timed=true if timedOut=true &&timed =true is set and all workers (threads with no interrupt flag set) will be set to interrupt flag. The thread immediately exits the blocking state while waiting for the task, throws an InterruptedException, sets the thread’s interrupted status to false, and nothing else happens.

}}}Copy the code

Refer to another tidied up article on interrupts; java.util.concurrent.ThreadPoolExecutor#interruptIdleWorkers(boolean)

2.8 Source Code Parsing -BlockQueue

Any {@link BlockingQueue} may be used to transfer and hold submitted tasks. The Use of this queue interacts with pool sizing:

If the number of running threads is less than the number of core threads, If fewer than corePoolSize Threads are running, the Executor always prefers adding a new thread rather than queuing

If the number of running threads is greater than the number of core threads, If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

If the request is not enqueued, the maximum number of threads may be created, If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, In which case, the task will be rejected. // Note that the default policy is to reject an exception if it causes the outer thread to terminate. If the thread pool is created by the same outer thread and is not a daemon thread, the thread pool will not terminate.

Team entry strategies: 3

There are three General Strategies for queuing: Why? how? SynchronousQueue –> Direct handoffs The possibility of infinite thread growth is realized when, on average, commands continue to arrive faster than they can be processed. Used to hand off tasks to threads without holding them in any other way. Here, if there is no thread immediately available to run the task, an attempt to queue the task will fail, so a new thread will be built. This policy avoids locking the processing of a set of requests that may have internal dependencies. Direct cross-zone switching usually requires unlimited maximumPoolSizes to avoid rejecting newly submitted tasks. Problem: The number of threads will surge, and resource consumption is uncontrollable.

// The second kind of LinkedBlockingQueue –>Unbounded queues // Why? Handling temporary increases in requests is possible. But there are problems with increasing it all the time; new tasks to wait in the queue when all corePoolSize threads are busy. Thus, No more than corePoolSize Threads will ever be created. The implication of this queue is that the maximum number of threads configuration is invalid. Problem: The queue length increases rapidly and resource consumption is uncontrollable.

// The third kind of bounded queue is ArrayBlockingQueue. // why? Limiting resource exhaustion when using wireless threads helps prevent resource exhaustion when used with finite maximumPoolSizes; Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead; // Use large queues and small core pools to save CPU OS context switching overhead, resulting in artificially low throughput; If tasks frequently block (for example if they are I/O bound), A system may be able to schedule time for more threads than you otherwise allow: Use of small queues generally requires larger pool sizes, Which keeps CPUs busier but may encounter scheduling overhead, which also decreases throughput. // Multiple pool threads may result in excessive scheduling overhead and lower throughput. 2.9 Worker implements AQS Why? And ThreadPoolExecutor members and inner classes

2.9 Worker implements AQS Why? And ThreadPoolExecutor members and inner classes

Email exchange with factcheck.org. Only when holding mainlock. ReentrantLock mainLock controls workers queue lock BlockingQueue workQueue Task Thread AtomicInteger CTL thread pool stat Condition termination Support awaitTermination Int LargestPoolSize Tracks largest early climbers pool size // Email exchange only under mainLock pool size Peak long completedTaskCount Counter for completed tasks // Updated only on termination of worker threads Accessed only under mainLock volatile ThreadFactory threadFactory // Factory for new threads

* All user control parameters are declared as volatiles so that * ongoing actions are based on freshest values, But without need * for locking, since no internal invariants depend on them // No internal invariants depend on it; * changing synchronously with respect to other actions.Copy the code

volatile RejectedExecutionHandler handler; // Handler called when saturated or shutdown in execute. // Volatile long keepAliveTime // Timeout in nanoseconds For idle Threads waiting for work. AllowCoreThreadTimeOut Enable or exceed core volatile Boolean allowCoreThreadTimeOut; // false :core threads stay alive even when idle; core threads use keepAliveTime to time out waiting for work volatile int corePoolSize; // allowCoreThreadTimeOut is set, in which case the minimum is zero volatile volatile int maximumPoolSize; // Maximum pool size static final RejectedExecutionHandler defaultHandler = new AbortPolicy() // rejected execution handler

Private final class Worker extends AbstractQueuedSynchronizer implements Runnable / / Worker for aqs according to? Lock the exclusive mode of the thread work, modify the interrupt state of the binding thread and worker internal variables, to prevent others from changing;

2.10 CTL control method

/ *

  • Methods for setting control state

* /

private void advanceRunState(int targetState) final void tryTerminate() private void checkShutdownAccess() private void interruptWorkers() private void interruptIdleWorkers() final void reject(Runnable command) private void interruptIdleWorkers(boolean onlyOne) private void interruptIdleWorkers() final void reject(Runnable command) void onShutdown() // hook final boolean isRunningOrShutdown(boolean shutdownOK) private List drainQueue() 2.11 主工作队列循环 扩容:不是差多少就扩容多少,而是去了任务队列和delta值的最小值,调用后有可能一段时间内都不扩容 // 但是pool size已经被更改了 缩容设置 通过:interruptIdleWorkers()完成,不会立即响应,如果都是堵塞可能全部被回收吗,核心线程也是可以回收的!!最终退出时:检测没有任务,的确可以降到pool size = 0

从 Runnable getTask() 响应了interruptIdleWorkers()中断,获取任务醒过来后如何处理中断呢?看下面主工作队列循环 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; // 默认不会从主循环中异常退出 try { while (task != null || (task = getTask()) != null) { // (task = getTask()) == null 就是工作线程退出的标志 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; // 工作线程退出主循环 } finally { processWorkerExit(w, completedAbruptly); // 工作线程退出主循环善后处理, 如:统计该工作线程处理过的任务数量,累加到总任务数(completedTaskCount Counter for completed tasks )等 } }

2.12 Worker thread exit processing source code:

Final void runWorker(Worker w) // Main Worker run loop. final The most important core method is the Main (Worker thread) loop, if this throws an exception, The man says he will stop working there and executes them

  1. We may start out with an initial task

  2. Before running any task, the lock is acquired to prevent

  3. Each task run is preceded by a call to beforeExecute, in which case we cause thread to die (breaking loop with completedAbruptly true) without processing the task. // Mission lost!

private void processWorkerExit(Worker w, Boolean completedAbruptly) {// false: Normal exit do not modify workerCount because getTask() exit with subtraction already done // reference Java. Util. Concurrent. ThreadPoolExecutor# getTask / / compareAndDecrementWorkerCount (c) and decrementWorkerCount / / true () method If (completedAbruptly) // if abnormal, then workerCount wasn’t adjusted decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (! completedAbruptly) {// completedAbruptly == false: normal exit int min = allowCoreThreadTimeOut? 0 : corePoolSize; if (min == 0 && ! Workqueue.isempty ()) // Pool size at least 1 min = 1; If (workerCountOf(c) >= min) // Pool size is at least 1 return; // replacement not needed } addWorker(null, false); // Add non-core worker thread}}Copy the code

2.13 Summary of Other methods:

/ *

  • Methods for creating, running and cleaning up after workers

* /

private boolean addWorker(Runnable firstTask, boolean core) private void addWorkerFailed(Worker w) private void processWorkerExit(Worker w, Boolean completedAbruptly) private Runnable getTask() // One of the core methods to get tasks // Public constructors and methods

Public void execute(Runnable command) // Public void shutdown() public List shutdownNow() public Boolean isShutdown() public boolean isTerminating() public boolean isTerminated() public boolean awaitTermination(long timeout, TimeUnit unit) // Wait for specified time, // Causes the current thread to wait until it is excursion or interrupted, or the specified waiting time elapses protected void finalize() public void setThreadFactory(ThreadFactory threadFactory) public ThreadFactory getThreadFactory() public void setRejectedExecutionHandler(RejectedExecutionHandler handler) public RejectedExecutionHandler getRejectedExecutionHandler() public void setCorePoolSize(int corePoolSize) public int getCorePoolSize() public boolean prestartCoreThread() void ensurePrestart() public int prestartAllCoreThreads() public boolean allowsCoreThreadTimeOut() public void allowCoreThreadTimeOut(boolean value) public void allowCoreThreadTimeOut(boolean value) public int getMaximumPoolSize() public void setKeepAliveTime(long time, TimeUnit unit) public long getKeepAliveTime(TimeUnit unit)

/* User-level queue utilities / public BlockingQueue getQueue() public boolean remove(Runnable task) // task –>submit –> Convert to Future: fail public void purge() // Purge all Future tasks Take slow path if we encounter interference during traversal. // Make copy for traversal and call remove for Cancelled entries. // Iterating The slow path with copy is more likely to be O(NN). // O(N*N) complexity // recovery –> storage recovery operations // Cancelled work may accumulate in queues and never be executed at all

/* Statistics */

To String method:

Pool size = nworkers = workers.size() Active threads = sum(w.isLocked()) // public int getActiveCount() // Get the number of active threads W.islocked () specifies the number of workers locked. Boolean isHeldExclusively() == getState()! = 0 queued tasks = workqueue.size () completedTasks = completedTaskCount + sum(w.completedTasks) // Number of tasks processed by the history thread + Number of tasks completed by existing threads 3 Summary of Worker’s entire workflow. Why does Worker inherit AQS? Because the implementation of low overhead thread safety interrupts play a role in this: Pool closed, scaling AQS source code details next Page