preface
- A {@link ExecutorService}, which can use one of multiple pool threads to execute each submitted task, typically configured using the {@Link ExecorService} factory method.
- Thread pools solve two different problems:
- Because of the reduced invocation overhead per task, they typically provide improved performance when performing a large number of asynchronous tasks,
- And they provide a way to bind and manage resources, including threads, that consume tasks while performing the execution of the collection.
- Each {@code ThreadPoolExecutor} also maintains some basic statistics, such as the number of completed tasks.
- To be useful in a broad context, this class provides a number of tunable parameters and extensibility hooks.
- However, it is strongly recommended that programmers use the more convenient {@link Executors} factory method {@link Executors# newCachedThreadPool} (borderless thread pool, automatic recycle),
- {@link Executors# newFixedThreadPool} (fixed size thread pool)
- And {@link Executors# newSingleThreadExecutor} (a single background thread),
- You can pre-configure Settings for the most common usage scenarios. Otherwise, use the following guidelines when manually configuring and adjusting this class:
-
- Core and maximum pool size
- {@code ThreadPoolExecutor} will be based on corePoolSize (see {@link #getCorePoolSize}) and maximumPoolSize (see {@link # getMaximumPoolSize}).
- When a new task is submitted in method {@link #execute (Runnable)} and fewer threads are running than the corePoolSize thread,
- Even if other worker threads are idle, a new thread is created to process the request.
- If the number of running threads is greater than corePoolSize but smaller than maximumPoolSize, new threads are created only when the queue is full.
- Fixed size thread pools can be created by setting corePoolSize and maximumPoolSize to be the same.
- By setting maximumPoolSize to an essentially unlimited value, such as {@code integer.max_value}, you can allow the pool to hold any number of concurrent tasks.
- Normally, the core and maximum pool sizes are set only at construction time, but can also be changed dynamically using {@link #setCorePoolSize} and {@link #setMaximumPoolSize}.
- According to the need to build
- By default, the core thread is only started or even started when a new task arrives,
- However, it can be overridden dynamically using the {@link # prestartCoreThreads} or {@link #prestartAllCoreThreads} methods.
- If a pool is constructed using a non-empty queue, threads may be prestarted.
- Creating a new thread
- Create a new thread with {@link ThreadFactory}.
- If not specified otherwise, use {@link Executors# defaultThreadFactory},
- The threads created by this thread are all in the same {@link ThreadGroup} and have the same {@code NORM_PRIORITY} priority and non-daemon status.
- By providing different ThreadFactories, you can change the name of a thread, thread group, priority, daemon state, and so on.
- If {@code ThreadFactory} fails when asked to create a thread by returning null from {@code newThread}, the executor will continue, but may not be able to perform any task.
- Threads should have “modifyThread” {@code RuntimePermission}.
- If the worker thread or other thread using the pool does not have this permission, the service may degrade: configuration changes may not take effect in a timely manner,
- And closing the pool may remain in a state that may terminate but not complete.
- Survival time
- If the number of threads in the current pool exceeds corePoolSize, the excess threads are terminated when the idle time exceeds keepAliveTime
- (see {@link #getKeepAliveTime (TimeUnit)}).
- This provides a way to reduce resource consumption when pooling is not actively used.
- If the pool later becomes more active, new threads are built.
- This parameter can also be changed dynamically using the method {@link #setKeepAliveTime (long, TimeUnit)}.
- Using the {@code long.max_value} value {@link TimeUnit# NANOSECONDS} effectively makes the idle thread never terminate before closing.
- By default, the keepalive policy applies only when there are multiple corePoolSize threads.
- But the method {@link #allowCoreThreadTimeOut (Boolean)} can also be used to apply this timeout policy to the core thread, as long as the keepAliveTime value is not zero.
- Line up
- Any {@link BlockingQueue} can be used to transfer and hold submitted tasks. The use of this queue interacts with pool size:
- If there are fewer threads running than corePoolSize threads, the executor always prefers to add new threads rather than queue them.
- If you are running corePoolSize or more threads, the executor always prefers to queue requests rather than add new threads.
- If the request cannot be queued, a new thread will be created unless the thread exceeds maximumPoolSize, in which case the task will be rejected.
- There are three general queuing strategies:
- Direct handover. {@link SynchronousQueue} is a good default choice for the work queue, which offloads tasks to threads rather than holding them separately.
- Here, if there is no thread immediately available to run the task, an attempt to queue the task will fail, so a new thread will be constructed.
- This policy avoids locking when dealing with a set of requests that may have internal dependencies. Switching directly usually requires unlimited maximumPoolSizes to avoid rejecting newly submitted tasks.
- Conversely, this can lead to the possibility of infinite thread growth when, on average, commands continue to arrive faster than they can be processed.
- Infinite queue. When all corePoolSize threads are busy, using an unrestricted queue (for example, {@link LinkedBlockingQueue} with no predefined capacity) will cause new tasks to wait in the queue.
- Therefore, only the corePoolSize thread will be created.
- (Therefore, the value of maximumPoolSize has no effect.) This may be appropriate when each task is completely independent of the others, so the tasks do not affect each other’s execution.
- For example, in a Web server.
- Although this queuing method is useful for eliminating transient bursts of requests, it acknowledges that the work queue can grow indefinitely when commands arrive on average faster than they can be processed.
- Bounded queues. When used with limited Sizes maximumpoolQueue, bounded queues (for example, {@link ArrayBlockingQueue}) help prevent resource exhaustion,
- But tuning and control can be more difficult. There may be tradeoffs between queue size and maximum pool size:
- Using large queues and small pools minimizes CPU utilization, operating system resources, and context switch overhead, but can result in artificially low throughput.
- If tasks block frequently (for example, if they are I /O constrained), the system may be able to schedule more time than you originally allowed for the thread.
- Using small queues typically requires a larger pool size, which keeps the CPU busy but can result in unacceptable scheduling overhead, which also reduces throughput.
- Refused to task
- When the executor is closed, and the executor is saturated with a limited range of maximum threads and work queue capacity,
- New tasks submitted in method {@link #execute (Runnable)} will be rejected.
- In either case, The {@code Execute} method will call its {@link RejectedExecutionHandler} {@link RejectedExecutionHandler # rejectedExecution (Runnable, ThreadPoolExecutor)} method.
- Four predefined handler policies are provided:
- In the default {@ link ThreadPoolExecutor. AbortPolicy}, the handler when refused to throw runtime {@ link RejectedExecutionException}.
- In {@ link ThreadPoolExecutor. CallerRunsPolicy}, call {@ code execute} threads run the task itself. This provides a simple feedback control mechanism that will slow down the delivery of new tasks.
- In {@ link ThreadPoolExecutor. DiscardPolicy}, simply delete the cannot be performed tasks.
- In {@ link ThreadPoolExecutor. DiscardOldestPolicy}, if not closed executable program, will throw away the work queue at the beginning of the task, and then retry to perform (the operation may fail again, leading to repeat do this.)
- Other types of {@Link RejectedExecutionHandler} classes can be defined and used.
- This requires extreme care, especially if the design policy only works under a specific capacity or queuing policy.
- Hook methods
- This class provides {@code protected} rewritable {@link #beforeExecute (Thread, Runnable)} and {@link #afterExecute (Runnable, Throwable)} methods. These methods are called before and after each task execution.
- These can be used to manipulate the execution environment. For example, reinitialize ThreadLocals, collect statistics, or add log entries.
- In addition, the method {@link #terminated} can be overridden to perform any special processing needed once the execution program is terminated completely.
- If a hook or callback method throws an exception, the internal worker thread may then fail and terminate abruptly.
- Queue maintenance
- The {@link #getQueue ()} method allows access to the work queue for monitoring and debugging.
- It is strongly recommended not to use this method for any other purpose.
- When canceling a large number of queued tasks, you can use the two provided methods {@link #remove (Runnable)} and {@link #purge} to aid in storage recycling.
- Finalization
- Pools that are no longer referenced in the program and have no remaining threads are automatically {@code shutdown}.
- If you want to make sure to reclaim unreferenced pools even if the user forgets to call {@link #shutdown},
- The appropriate keepalive time must be set by using the lower limit of zero core threads,
- To arrange for unused threads to eventually die and/or set {@link #allowCoreThreadTimeOut (Boolean)}.
The source code
package java.util.concurrent; Public class ThreadPoolExecutor extends AbstractExecutorService public class ThreadPoolExecutor extends AbstractExecutorService public class ThreadPoolExecutor extends AbstractExecutorService public class ThreadPoolExecutor extends AbstractExecutorService public class ThreadPoolExecutor extends AbstractExecutorService Indicates the valid number of threads * runState, indicates whether to run, close, etc. * To package them as an int, we limit the workerCount to (2 ^ 29) -1 (about 500 million) threads, * instead of (2 ^ 31) -1 (2 billion) threads that can be represented. * If you have future problems, you can change this variable to AtomicLong and adjust it belowshift/ mask constant. * But using int makes this code faster and simpler until you need it. * workerCount is the number of workers who have been allowed to start but not to stop. * This value may differ temporarily from the actual number of active threads, * for example, when ThreadFactory fails to create a thread when asked, and the exit thread still performs a bookkeeping operation before terminating. * User visibility pool size reports for the current size of the working set. * runState provides primary lifecycle control and has the following values: * RUNNING: accepts new tasks and processes queued tasks * SHUTDOWN: does not accept new tasks, but processes queued tasks * STOP: * TIDYING: All tasks are Terminated, workerCount is zero, and the thread switching to * TIDYING state runs the hook method Terminated () * Terminated: Terminald () has done the numeric order between these values is important for an ordered comparison. * runState increases monotonically over time, but does not have to reach every state. * RUNNING-> SHUTDOWN May be implicitly in Finalize () * (RUNNING or SHUTDOWN) -> STOP when shutdownNow () is called * SHUTDOWN-> TIDYING when queue and pool are empty * STOP-> TIDYING when pool is empty * TIDYING-> TERMINATED When the state is TERMINATED * when the TERMINATED () hook method is completed, the threads waiting in awaitTermination () will return. * Detecting the transition from SHUTDOWN to TIDYING is not as simple as you might want, since the queue may become empty after it is not empty and vice versa during the SHUTDOWN state, * but only after seeing it empty do we see that the workerCount is 0 (sometimes requiring a recheck - see below). */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) {return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while(! compareAndDecrementWorkerCount(ctl.get())); } private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; /** * Factory for new threads. All threads are created using this factory (through the method addWorker). All callers must be prepared for addWorker to fail, which may reflect system or user policy limiting the number of threads. Even if it is not considered an error, a failure to create a thread can cause new tasks to be rejected or existing tasks to remain in the queue. We go further, and even if we encounter an error such as OutOfMemoryError, which will keep the pool invariant and may be thrown when we try to create a thread. Since native stacks need to be allocated in Thread.start, such errors are fairly common, and users will need to perform a cleanup pool shutdown to clean up. There may be enough memory available to complete the cleanup code without encountering another OutOfMemoryError. */ private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
private final AccessControlContext acc;
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
returngetState() ! = 0; } protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if(getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } private void advanceRunState(int targetState) {for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if(workerCountOf(c) ! = 0) { interruptIdleWorkers(ONLY_ONE);return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if(security ! = null) { security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
void onShutdown() {
}
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if(! q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {
if(q.remove(r)) taskList.add(r); }}return taskList;
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheckwhile holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if(w ! = null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } } private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if(! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! = null)return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}} /** ** The master worker runs the loop. Repeatedly fetching tasks from the queue and executing them while solving many problems: * 1. We can start with the initial task, in which case we don't need to get the first task. * Otherwise, we get the task from getTask as long as the pool is running. * If null is returned, the worker exits due to the changed pool state or configuration parameter. * The other exits are caused by an exception thrown in the external code, in which case completedAbruptly holds, * this usually causes processWorkerExit to replace the thread. * 2. Before running any task, acquire the lock to prevent other pools from breaking while the task is executing, and then ensure that this thread does not set mid-break unless the pool is stopping. * 3. BeforeExecute is called before each task runs, which may throw an exception, in which case * we will cause the thread to die (abort, with completelyAbruptly)trueLoop) without processing the task. * 4. Assuming beforeExecute completes properly, we run the task and collect any exceptions it throws to send to afterExecute. * We handle RuntimeExceptions, Errors (the specification guarantees we can catch them), and any Throwables, respectively. * Since we can't throw Throwable in throwables. run, * we wrap them in Errors (to the thread's UncaughtExceptionHandler). * Any exceptions thrown will also conservatively cause the thread to die. After task.run is complete, we call afterExecute, which may also raise an exception, which will also cause the thread to die. * According to JLS Sec 14.20, this exception is valid even if task.run throws it. * The end result of the exception mechanism is that afterExecute and the thread's UncaughtExceptionHandler have accurate information about any problems the user's code is encountering. * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly =true;
try {
while(task ! = null || (task = getTask()) ! = null) { w.lock();if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly =false; } finally { processWorkerExit(w, completedAbruptly); }} // Public constructors and methods /** * Create a new {@code ThreadPoolExecutor} with the given initial parameters, the default thread factory, and the rejected execution handler. * It might be more convenient to use one of the {@link Executors} factory methods instead of this generic constructor. */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } /** * create a new {@code ThreadPoolExecutor} with the given initial arguments and the default reject execution handler. */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } /** * create a new {@code ThreadPoolExecutor} with the given initial parameters and the default thread factory. */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } /** * create a new {@code ThreadPoolExecutor} with the given initial parameters. */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted forexecution, either because this * executor has been shutdown or because its capacity has been reached, * The task is handled by the current {@code RejectedExecutionHandler}. * * Execute the given task at some future time. * This task can be executed in a new thread or an existing pool thread. * If a task cannot be submitted for execution because the actuator is shut down or has reached its capacity, the task is handled by the current {@code RejectedExecutionHandler}. * * @paramcommand the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command== null) throw new NullPointerException(); /* * this is done in 3 steps: * 1. If there are fewer threads running than the corePoolSize thread, try starting a new thread with the given command as its first task. The call to addWorker checks runState and workerCount atomically, thereby passing the returnfalseTo prevent error alerts when threads should not be added. * 2. If a task can be queued successfully, then we still need to carefully check whether a thread should be added (because the existing thread has died since the last check) or the pool has been closed since entering this method. Therefore, we re-check the state and roll back the queue if necessary (if it has been stopped) or start a new thread if there are no threads left. * 3. If we cannot queue the task, try adding a new thread. If it fails, we know we are closed or saturated and therefore reject the task. */ int c = ctl.get();if (workerCountOf(c) < corePoolSize) {
if (addWorker(command.true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if(! addWorker(command.false))
reject(command);
}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to doThat. * Initiates an orderly shutdown in which previously submitted tasks are executed but no new tasks are accepted. * If closed, the call does not have any other effects. * This method does not wait for previously submitted tasks to complete execution. * use {@ the link#awaitTermination awaitTermination} can do this.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that* Fails to respond to Interrupts may never terminate. * * Attempts to stop all ongoing tasks, abort processing of pending tasks, and return a list of pending tasks. * After returning from this method, the tasks are emptied (deleted) from the task queue. * This method does not wait for actively executed tasks to terminate. * use {@ the link#awaitTermination awaitTermination} can do this.* There are no guarantees other than to do your best to prevent stopping processing of ongoing tasks. * This implementation cancels a task with {@link Thread# interrupt}, so any task that cannot respond to an interrupt may never terminate. * * @throws SecurityException {@inheritDoc} */ public List<Runnable>shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown() {
return! isRunning(ctl.get()); } /** * If this executable is {@linkReturn true if terminated after #shutdown} or {@link #shutdownNow} but not completely terminated.* This method can be useful for debugging. * Report returns {@code long enough after closingtrue} may indicate that the submitted task has been ignored or interrupts have been suppressed, causing the executor to fail to terminate correctly. */ public booleanisTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* Invokes {@code shutdown} when this executor is no longer
* referenced and it has no threads.
*/
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
/**
* Sets the thread factory used to create new threads.
*
* @param threadFactory the new thread factory
* @throws NullPointerException if threadFactory is null
* @see #getThreadFactory
*/
public void setThreadFactory(ThreadFactory threadFactory) {
if(threadFactory == null) throw new NullPointerException(); this.threadFactory = threadFactory; } /** * Returns the thread factory used to create new threads. * @return the current thread factory
* @see #setThreadFactory(ThreadFactory)
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
/**
* Sets a new handler forUnexecutable Tasks. * Sets a new handler for tasks that cannot be executed. * @param handler the new handler * @throws NullPointerExceptionif handler is null
* @see #getRejectedExecutionHandler
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
/**
* Returns the current handler forUnexecutable Tasks. * Returns the current handler that cannot execute the task. * * @return the current handler
* @see #setRejectedExecutionHandler(RejectedExecutionHandler)
*/
public RejectedExecutionHandler getRejectedExecutionHandler() {
returnhandler; } /** * Sets the number of core threads. * This overrides any values set in the constructor. * If the new value is less than the current value, the extra existing threads will terminate at the next idle time. * If larger, new threads will be started as needed to perform any queued tasks. */ public voidsetCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break; }} /** * Returns the core number of threads. * @return the core number of threads
* @see #setCorePoolSize
*/
public int getCorePoolSize() {
return corePoolSize;
}
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return {@code false}
* ifAll core threads have already been started. All core threads have already been started. * This overrides the default policy of starting the core thread only when a new task is executed. * This method returns {@code if all core threads are startedfalse}。
*
* @return {@code true} if a thread was started
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
* Same as prestartCoreThread except arranges that at least one
* thread is started even ifCorePoolSize is 0. * Same as prestartCoreThread, even if corePoolSize is 0 at least one thread is started. */ voidensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
/**
* Starts all core threads, causing them to idly wait forThis * overrides the default policy of starting core threads only when * new tasks are executed. Keep them idle and waiting for work. * This overrides the default policy of starting the core thread only when a new task is executed. * * @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
/**
* Returns true if this pool allows core threads to time out and
* terminate if no tasks arrive within the keepAlive time, being
* replaced if needed when new tasks arrive. When true, the same
* keep-alive policy applying to non-core threads applies also to
* core threads. When false(the default), Core threads are never * terminated due to lack of incoming taskstrue, * If a new task arrives, the thread is replaced as needed. * if fortrue, the same keepalive policy that applies to non-core threads also applies to core threads. * if forfalse(default), the core thread never terminates due to the lack of incoming tasks. * * @return {@code true} if core threads are allowed to time out,
* else {@code false}
*
* @since 1.6
*/
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
/**
* Sets the policy governing whether core threads may time out and
* terminate if no tasks arrive within the keep-alive time, being
* replaced if needed when new tasks arrive. When false, core
* threads are never terminated due to lack of incoming
* tasks. When true. the same keep-alive policy applying to * non-core threads applies also to core threads. To avoid * continual thread replacement, the keep-alive time must be * greater than zero when setting {@codetrue}. This method
* should inGeneral be called before the pool is actively used. * * Set a policy to control whether the core thread may time out and terminate if no task arrives during the remaining active time, and be replaced as needed when a new task arrives. * if forfalse, the core thread never terminates due to the lack of incoming tasks. * if fortrue, the same keepalive policy that applies to non-core threads also applies to core threads. * To avoid continuous thread replacement, set {@codetrue}, the stay active time must be greater than zero. * This method should normally be called before actively using pools. * * @param value {@codetrue} if should time out, else {@code false}
* @throws IllegalArgumentException if value is {@code true}
* and the current keep-alive time is not greater than zero
*
* @since 1.6
*/
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if(value ! = allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value;if (value)
interruptIdleWorkers();
}
}
/**
* Sets the maximum allowed number of threads. This overrides any
* value set inthe constructor. If the new value is smaller than * the current value, Excess existing threads will be * terminated when they next become idle. This overrides any values set in the constructor. * If the new value is less than the current value, the extra existing threads will terminate at the next idle time. * * @param maximumPoolSize the new maximum * @throws IllegalArgumentExceptionif the new maximum is
* less than or equal to zero, or
* less than the {@linkplain #getCorePoolSize core pool size}
* @see #getMaximumPoolSize
*/
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if(workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); } /** * Returns the maximum allowed number of threads. * @return the maximum allowed number of threads
* @see #setMaximumPoolSize
*/
public int getMaximumPoolSize() {
return maximumPoolSize;
}
/**
* Sets the time limit for which threads may remain idle before
* being terminated. If there are more than the core number of
* threads currently in the pool, after waiting this amount of
* time without processing a task, excess threads will be
* terminated. This overrides any value set inThe constructor. * Sets the time limit for which a thread may remain idle before terminating. * If the number of threads in the current pool exceeds the number of cores, the excess threads will be terminated after waiting this time without processing the task. * This overrides any values set in the constructor. * * @param time the time to wait. A time value of zero will cause * excess threads to terminate immediately after executing tasks. * @param unit the time unit of the {@code time} argument * @throws IllegalArgumentExceptionif {@code time} less than zero or
* if {@code time} is zero and {@code allowsCoreThreadTimeOut}
* @see #getKeepAliveTime(TimeUnit)
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
/**
* Returns the thread keep-alive time, which is the amount of time
* that threads inExcess of the core pool size may remain * idle before being terminated. * * @param unit the desired time unit of the result * @return the time limit
* @see #setKeepAliveTime(long, TimeUnit)
*/
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
/* User-level queue utilities */
/**
* Returns the task queue used by this executor. Access to the
* task queue is intended primarily for debugging and monitoring.
* This queue may be inActive use. Retrieving the task queue * does not prevent queued tasks from executing. * Returns the task queue used by this executing program. * Access to the task queue is primarily used for debugging and monitoring. * The queue may be in use. Retrieving the task queue does not prevent queued tasks from executing. * * @return the task queue
*/
public BlockingQueue<Runnable> getQueue() {
returnworkQueue; } /** * If this task exists in the internal queue of the executing program, it will be deleted. If it is not started, the task cannot run. * This method may be used as part of a cancellation scheme. * It may not be able to delete a task that has been converted to another form until it is placed on an internal queue. * For example, a task entered with {@code Submit} might be converted to a form that holds {@code Future} state. * However, in this case, the method {@link can be used#purge} delete the cancelled futures.
*
* @param task the task to remove
* @return {@code true} if the task was removed
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
/**
* Tries to remove from the work queue all {@link Future}
* tasks that have been cancelled. This method can be useful as a
* storage reclamation operation, that has no other impact on
* functionality. Cancelled tasks are never executed, but may
* accumulate in work queues until worker threads can actively
* remove them. Invoking this method instead tries to remove them now.
* However, this method may fail to remove tasks in* The presence of Interference by other Threads. * * Attempted to delete all cancelled {@link Future} tasks from the work queue. * This method can be used as a storage reclamation operation with no other functional impact. * Cancelled tasks are never executed, but may accumulate in the work queue until the worker thread can voluntarily remove them. * Instead, call this method to try to remove it immediately. However, this method may not be able to delete the task if there is interference from other threads. * */ public voidpurge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if(r instanceof Future<? > && ((Future<? >)r).isCancelled()) it.remove(); } } catch (ConcurrentModificationException fallThrough) { // Take slow pathif we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if(r instanceof Future<? > && ((Future<? >)r).isCancelled()) q.remove(r); } tryTerminate(); // Incase SHUTDOWN and now empty
}
/* Statistics */
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have ever been
* scheduled for execution. Because the states of tasks and
* threads may change dynamically during computation, the returned
* value is only an approximation.
*
* @return the number of tasks
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
returnn + workQueue.size(); } finally { mainLock.unlock(); } } /** * Returns the approximate total number of tasks that have * completed execution. Because the states of tasks and threads * may change dynamically during computation, the returned value * is only an approximation, but one that does not ever decrease * across successive calls. * * @return the number of tasks
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns a string identifying this pool, as well as its state,
* including indications of run state and estimated worker and
* task counts.
*
* @return a string identifying this pool, as well as its state
*/
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
/* Extension hooks */
/**
* Method invoked prior to executing the given Runnable in the
* given thread. This method is invoked by thread {@code t} that
* will execute task {@code r}, and may be used to re-initialize
* ThreadLocals, or to perform logging.
*
* <p>This implementation does nothing, but may be customized in* subclasses. Note: To properly nest multiple overridings, subclasses * should generally invoke {@code super.beforeExecute} at the end of * this method. * * The method called before the given Runnable is executed in the given thread. * This method is called by the thread {@code t} that performs the task {@code r} and can be used to reinitialize ThreadLocals or perform logging. * This implementation does nothing, but it can be customized in subclasses. Note: To properly nest multiple overrides, subclasses should normally call {@code super.beforeExecute} at the end of this method. * * @param t the thread that will run task {@code r} * @param r the task that will be executed */ protected void beforeExecute(Thread t, Runnable r) { } /** * Method invoked upon completion of execution of the given Runnable. * This method is invoked by the thread that executed the task. If * non-null, the Throwable is the uncaught {@code RuntimeException} * or {@code Error} that caused execution to terminate abruptly. * * <p>This implementation does nothing, but may be customizedin* subclasses. Note: To properly nest multiple overridings, subclasses * should generally invoke {@code super.afterExecute} at the * beginning of this method. * Given the method to be called when the Runnable execution completes. * This method is called by the thread executing the task. * If not empty, Throwable is an uncaught {@code RuntimeException} or {@code Error} that causes execution to end abruptly. * This implementation does nothing, but it can be customized in subclasses. * Note: To properly nest multiple overrides, subclasses should normally call {@code super.afterexecute} at the beginning of this method. * * <p><b>Note:</b> When actions are enclosedin tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* {@code submit}, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are <em>not</em>
* passed to this method. If you would like to trap both kinds of
* failures in this method, you can further probe for such cases,
* as in this sample subclass that prints either the direct cause
* or the underlying exception ifa task has been aborted: * Note: If actions are included explicitly or through a method such as {@code Submit} in a task (for example {@Link FutureTask}), * these task objects catch and maintain computational exceptions, so they do not cause sudden termination, and internal exceptions are not passed to this method. * If you want to use this method to catch both types of failures, you can explore this situation further. * * <pre> {@code * class ExtendedExecutor extends ThreadPoolExecutor {* //... * protected void afterExecute(Runnable r, Throwable t) { * super.afterExecute(r, t); *if(t == null && r instanceof Future<? >) { * try { * Object result = ((Future<? >) r).get(); * } catch (CancellationException ce) { * t = ce; * } catch (ExecutionException ee) { * t = ee.getCause(); * } catch (InterruptedException ie) { * Thread.currentThread().interrupt(); // ignore/reset * } * } *if(t ! = null) * System.out.println(t); * } * }}</pre> * * @param r the runnable that has completed * @param t the exception that caused termination, or nullif* execution completed normally */ protected void afterExecute(Runnable r, Throwable t) { } /** * Method invoked when the Executor has terminated. Default * implementation does nothing. Note: To properly nest multiple * overridings, Subclasses should generally invoke * {@code super.terminated} within this method. * * Execute the method called when the program terminates. * The default implementation does nothing. * Note: To properly nest multiple overrides, subclasses normally call {@code super.terminated} inside this method. * */ protected voidterminated() {} / * Predefined RejectedExecutionHandlers * / / * * * refused to task handler, you can directly in {@ code execute} method refuses to run in the calling thread tasks, * unless actuators has been closed, in this case, The task will be discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { publicCallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(! e.isShutdown()) { r.run(); }}} / * * * throw {@ code RejectedExecutionException} refuses to task handler. */ public static class AbortPolicy implements RejectedExecutionHandler { publicAbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() {} public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}} Then retry {@code execute}, * unless the executor is closed, in which case the task will be discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { publicDiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}}Copy the code