1. Thread pool Overview
Thread pools separate thread execution from tasks, which consist of Runnable and Callable interfaces and RunnableFuture interfaces.
2. Thread pool basics
Executor, the top-level interface of the thread pool, has only one method: execute(), which executes the task. ExecutorService extends executors, increases control over thread pools, and enhances thread execution.
2.1 Analyzing the ExecutorService method
- The difference between Shutdown and shutdownNow and its internal principle
Shutdown: notifies the thread pool to be closed, no more tasks will be added, and the thread pool that has already executed tasks will continue to execute. ShutdownNow: Immediately closes the thread pool (for how long immediately), no new tasks are accepted, and executed tasks are stopped immediately (are executed tasks really stopped immediately?)
public void shutdown(a) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Check if you have permission to close the thread pool
checkShutdownAccess();
// Change the state of the thread pool to SHUTDOWN
advanceRunState(SHUTDOWN);
// Mark all threads as interrupted
interruptIdleWorkers();
onShutdown(); // This method is not implemented. It is reserved to perform some cleanup
} finally {
mainLock.unlock();
}
tryTerminate();
}
Copy the code
Why is it possible to stop adding tasks with the shutdown() method, and the thread pool that has already executed the task continues to execute and will be closed? First of all: No new tasks are added by checking the thread pool state in the addWorker method, interruptIdleWorkers marks all idle threads as interrupted, and getTask() called in the runWorker() method checks the thread pool state, New quests will no longer be acquired. Close after all threads finish executing.
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// No new tasks will be added
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}Copy the code
Explore this question to see how a task is performed.
public List<Runnable> shutdownNow(a) {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// Mark the state as STOP
advanceRunState(STOP);
// Flag to interrupt all threads
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Copy the code
- Submit method of three different differences
Submit (Callable call) has a return value. Submit (Runnable run) has a return value. If it is RunnableFuture, it has a return value.
- The internal implementation principle of invokeAll method
InvokeAll stands for batch commit execution, which is not practical.
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if(! f.isDone()) {try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if(! done)for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true); }}Copy the code
3. Thinking about thread pool
- How do thread pools reuse threads?
The key of thread pool reuse: Thread pool wraps each thread into Worker class, which inherits AQS. When creating a Worker, the first task is usually specified, and then the task is obtained through the getTask() loop. If the task cannot be obtained, the thread is closed according to the policy
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
- The type of thread pool
AllowCoreThreadTimeOut Allows the core thread to set whether or not to disable timeout.
- Advanced thread pool usage
- Check the number of active threads :getActiveCount()
- View the total number of threads
- View total number of tasks: getTaskCount()
- View the total number of completed tasks: getCompletedTaskCount()