Back to the home page
I’m sure you’ve all read a lot of articles about thread pools, and I’m sure you’ve read a lot of articles about thread pools, and I’m sure you’ve read a lot of articles about thread pools, and maybe even a lot of nonsense. Hopefully, after reading this article, you will have a complete grasp of Java thread pools.
I found that a lot of people came to this site because of this article, I hope you can benefit from this first impression.
The main focus of this article is source code parsing, but thread pool design ideas and some clever uses of the author’s implementation are what I want to pass on to the reader. This article will be a line of key code analysis, the purpose is to let those who see their own source code is not very understand the students can get reference.
Thread pooling is an important tool that you need to master if you want to be a good engineer. Even if you’re trying to make a living, know that this is basically a must-ask question for an interview, and interviewers can easily pick up on a candidate’s skill level from their answers.
This article is a bit longer and is recommended to be read on a PC and read the source code as you go along (Java7 and Java8 are the same). Readers who want to read it are advised to devote at least 15 to 30 minutes to the whole block. Of course, if the reader is only preparing for the interview, he or she can slide right to the final summary.
directory
- The overview
- The Executor interface
- ExecutorService
- FutureTask
- AbstractExecutorService
- ThreadPoolExecutor
- Executors
- conclusion
The overview
Let’s start with some nonsense. The following is the inheritance structure of several related classes in Java thread pool:
Executors are at the top and the simplest. There is a execute(Runnable Runnable) interface method definition.
ExecutorService is also an interface, adding many interface methods to the Executor interface, so we typically use this interface.
The next level down is AbstractExecutorService, which, as we know by its name, is an abstract class that implements some very useful methods that subclasses can use directly, but we’ll talk more about that later.
Then we come to the ThreadPoolExecutor class, which provides the very rich functionality needed for thread pools.
In addition, we cover the following classes:
The following methods are used to generate instances of ThreadPoolExecutor by following the Executors class name (s) :
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Copy the code
In addition, since thread pools support fetching the results of thread execution, the Future interface was introduced, RunnableFuture inherits from this interface, and the most important thing to care about is its implementation class, FutureTask. Now, remember, when we use thread pools, we’re submitting tasks to the thread pool, and as anyone who’s ever used a thread pool knows, every task we submit implements the Runnable interface, essentially wrapping the Runnable task as a FutureTask, It then commits to the thread pool. This makes it easier for readers to remember the FutureTask class name: it is first a Task and then has the semantics of a Future interface, meaning that the result of its execution can be obtained in the Future.
Of course, BlockingQueue in the thread pool is also a very important concept. If the number of threads reaches corePoolSize, each of our tasks will be submitted to a wait queue for the thread in the thread pool to pick up the task and execute it. The BlockingQueue here is typically implemented using the LinkedBlockingQueue, ArrayBlockingQueue, and SynchronousQueue classes. Each implementation class has different characteristics that will be analyzed later in the usage scenario. Readers who want to take a closer look at the various BlockingQueues can refer to my previous article that took a closer look at the various BlockingQueue implementation classes.
Complete the things said: in addition to the above said these classes, there is also a very important class, is timing task implementation class ScheduledThreadPoolExecutor, it inherits from this paper will focus on ThreadPoolExecutor, used to achieve timing is carried out. However, this article will not cover its implementation, I believe that readers will be able to easily understand the source code after reading this article.
Above is the knowledge that this article wants to introduce, nonsense does not say much, begin to enter the body.
The Executor interface
/* * @author Doug Lea */ public interface Executor {void execute(Runnable command); }Copy the code
We can see that the Executor interface is very simple, with a void execute(Runnable Command) method that represents submitting a task. To give you an idea of the overall design of Java thread pools, I’ll follow Doug Lea’s lead and say a few more things about it.
We often start a thread like this:
new Thread(new Runnable(){
// do something
}).start();
Copy the code
Using a thread pool Executor can be used like this:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
Copy the code
If we wanted the thread pool to execute each task synchronously, we could implement this interface like this:
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); // No new Thread(r).start() is used. }}Copy the code
We want each task submitted to start a new thread to execute the task. We can do this:
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); // Each task is executed with a new thread}}Copy the code
Synchronized adds all tasks to a queue, fetches them from the queue, and assigns them to a real Executor:
Class SerialExecutor implements Executor {// Final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); // This is the final Executor Executor; // The task being executed Runnable active; SerialExecutor(Executor Executor) {this. Executor = Executor; } // Add tasks to the thread pool: Add the task to the task queue, ScheduleNext triggers an actuator to fetch tasks from the task queue. Public synchronized void execute(final Runnable r) {tasks.offer(new Runnable() {public void run() { try { r.run(); } finally { scheduleNext(); }}}); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) ! Executor.execute (active) = null) {// The execution is transferred to the real executor executor. }}}Copy the code
Of course, the Executor interface only has the ability to submit tasks. It’s too simple. We want to know the execution result, we want to know how many threads are alive in the current thread pool, how many tasks have been completed, and so on. The next step is the ExecutorService interface, which inherits from the Executor interface, which provides a wealth of functionality and is the one we use most often.
ExecutorService
When we define a thread pool, we usually use this interface:
ExecutorService executor = Executors.newFixedThreadPool(args...) ; ExecutorService executor = Executors.newCachedThreadPool(args...) ;Copy the code
Because the set of methods defined in this interface are, for the most part, sufficient for our needs.
So let’s take a quick look at the methods in this interface:
ExecutorService extends Executor {void shutdown(); void ExecutorService extends Executor {void shutdown(); List<Runnable> shutdownNow(); List<Runnable> shutdownNow(); // Whether the thread pool is closed Boolean isShutdown(); // If all tasks are finished after shutdown() or shutdownNow() is called, This method must be called after shutdown or shutdownNow to return true Boolean isTerminated(); // Wait for all tasks to complete and set the timeout. // In practice, call shutdown or shutdownNow and wait for all threads to complete. Boolean awaitTermination(Long timeout, TimeUnit Unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); // Submit a Runnable task, the second parameter will be placed in the Future as the return value, <T> Future<T> submit(Runnable task, T result); // Submit a Runnable task Future<? > submit(Runnable task); List<Future<T> list <Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; TimeoutException <T> T invokeAny(Collection<?) <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }Copy the code
It’s all very well understood, a simple thread pool is all about that, you can submit tasks, you can get results, you can close the thread pool, and that’s why we use this interface a lot.
FutureTask
Before continuing with the ExecutorService implementation classes, let’s talk about the related class FutureTask.
Future -> RunnableFuture -> FutureTask Runnable -> RunnableFuture FutureTask implements the Runnable interface indirectly through RunnableFuture, So each Runnable is typically wrapped as FutureTask and then submitted to the thread pool by calling executor.execute(Runnable Command)Copy the code
As we know, Runnable’s void run() method does not return a value, so we usually specify a second parameter as the return value in submit if necessary:
<T> Future<T> submit(Runnable task, T result);
Copy the code
In fact, it will be wrapped as a Callable with these two parameters.
Callable also has this interface because of the need for thread pools. It differs from Runnable in that run() does not return a value, whereas Callable’s call() method does, and if an exception occurs during the run, call() throws an exception.
public interface Callable<V> {
V call() throws Exception;
}
Copy the code
I won’t expand on the FutureTask class here, because this article is already large enough that we need to know how to use it.
Next, we’ll look at the abstract implementation of ExecutorService AbstractExecutorService.
AbstractExecutorService
The AbstractExecutorService abstract class is derived from the ExecutorService interface, and then implements several useful methods that subclasses can invoke.
This abstract class implements the invokeAny and invokeAll methods, and the two newTaskFor methods here are also useful for wrapping tasks as FutureTasks. Void Execute (Runnable Command) defined in the uppermost Executor interface does not wrap FutureTask because it does not need to fetch results.
To obtain the result (FutureTask), submit method, do not need to obtain the result, can use the execute method.
Below, I will analyze the class line by line source code, follow the source code to see its implementation:
Tips: The invokeAny and invokeAll methods make up the vast majority of this class, so you can skip them because they’re probably used infrequently in your practice, and they don’t serve as a link between what’s going on and what’s going on.
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
// 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 提交任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交给执行器执行,execute 方法由具体的子类来实现
// 前面也说了,FutureTask 间接实现了Runnable 接口。
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
// 此方法目的:将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
// 第二个参数 timed 代表是否设置超时机制,超时时间为第三个参数,
// 如果 timed 为 true,同时超时了还没有一个线程返回结果,那么抛出 TimeoutException 异常
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
// 任务数
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
// ExecutorCompletionService 不是一个真正的执行器,参数 this 才是真正的执行器
// 它对执行器进行了包装,每个任务结束后,将结果保存到内部的一个 completionQueue 队列中
// 这也是为什么这个类的名字里面有个 Completion 的原因吧。
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 用于保存异常信息,此方法如果没有得到任何有效的结果,那么我们可以抛出最后得到的一个异常
ExecutionException ee = null;
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 首先先提交一个任务,后面的任务到下面的 for 循环一个个提交
futures.add(ecs.submit(it.next()));
// 提交了一个任务,所以任务数量减 1
--ntasks;
// 正在执行的任务数(提交的时候 +1,任务结束的时候 -1)
int active = 1;
for (;;) {
// ecs 上面说了,其内部有一个 completionQueue 用于保存执行完成的结果
// BlockingQueue 的 poll 方法不阻塞,返回 null 代表队列为空
Future<T> f = ecs.poll();
// 为 null,说明刚刚提交的第一个线程还没有执行完成
// 在前面先提交一个任务,加上这里做一次检查,也是为了提高性能
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 这里是 else if,不是 if。这里说明,没有任务了,同时 active 为 0 说明
// 任务都执行完成了。其实我也没理解为什么这里做一次 break?
// 因为我认为 active 为 0 的情况,必然从下面的 f.get() 返回了
// 2018-02-23 感谢读者 newmicro 的 comment,
// 这里的 active == 0,说明所有的任务都执行失败,那么这里是 for 循环出口
else if (active == 0)
break;
// 这里也是 else if。这里说的是,没有任务了,但是设置了超时时间,这里检测是否超时
else if (timed) {
// 带等待的 poll 方法
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
// 如果已经超时,抛出 TimeoutException 异常,这整个方法就结束了
if (f == null)
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
// 这里是 else。说明,没有任务需要提交,但是池中的任务没有完成,还没有超时(如果设置了超时)
// take() 方法会阻塞,直到有元素返回,说明有任务结束了
else
f = ecs.take();
}
/*
* 我感觉上面这一段并不是很好理解,这里简单说下。
* 1. 首先,这在一个 for 循环中,我们设想每一个任务都没那么快结束,
* 那么,每一次都会进到第一个分支,进行提交任务,直到将所有的任务都提交了
* 2. 任务都提交完成后,如果设置了超时,那么 for 循环其实进入了“一直检测是否超时”
这件事情上
* 3. 如果没有设置超时机制,那么不必要检测超时,那就会阻塞在 ecs.take() 方法上,
等待获取第一个执行结果
* 4. 如果所有的任务都执行失败,也就是说 future 都返回了,
但是 f.get() 抛出异常,那么从 active == 0 分支出去(感谢 newmicro 提出)
// 当然,这个需要看下面的 if 分支。
*/
// 有任务结束了
if (f != null) {
--active;
try {
// 返回执行结果,如果有异常,都包装成 ExecutionException
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}// 注意看 for 循环的范围,一直到这里
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 方法退出之前,取消其他的任务
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
// 执行所有的任务,返回任务结果。
// 先不要看这个方法,我们先想想,其实我们自己提交任务到线程池,也是想要线程池执行所有的任务
// 只不过,我们是每次 submit 一个任务,这里以一个集合作为参数提交
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 这个很简单
for (Callable<T> t : tasks) {
// 包装成 FutureTask
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 提交任务
execute(f);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
// 这是一个阻塞方法,直到获取到值,或抛出了异常
// 这里有个小细节,其实 get 方法签名上是会抛出 InterruptedException 的
// 可是这里没有进行处理,而是抛给外层去了。此异常发生于还没执行完的任务被取消了
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
// 这个方法返回,不像其他的场景,返回 List<Future>,其实执行结果还没出来
// 这个方法返回是真正的返回,任务都结束了
return futures;
} finally {
// 为什么要这个?就是上面说的有异常的情况
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
// 带超时的 invokeAll,我们找不同吧
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
long lastTime = System.nanoTime();
Iterator<Future<T>> it = futures.iterator();
// 提交一个任务,检测一次是否超时
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
// 超时
if (nanos <= 0)
return futures;
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0)
return futures;
try {
// 调用带超时的 get 方法,这里的参数 nanos 是剩余的时间,
// 因为上面其实已经用掉了一些时间了
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
}
Copy the code
Here, we see that the abstract class wraps some basic methods, but methods like Submit, invokeAny, invokeAll, etc., do not actually start the thread to execute the task. They all just call the execute method inside the method. So the most important execute(Runnable Runnable) method has yet to be implemented, and we need to wait for the implementation of the most important part, which is the ThreadPoolExecutor class.
Given the length of this article, I don’t think there are many readers left, because of fast food culture. Every article I write tries to make readers remember all the relevant knowledge points through one of my articles, so the length is a little longer. In fact, if you have worked for many years, you will have a feeling, for example, that even after reading 20 summaries, it is not as good as a long article to explain every knowledge point clearly. A little less is more, and more is less.
ThreadPoolExecutor
ThreadPoolExecutor is a thread pool implementation in the JDK. This class implements all the methods needed for a thread pool, including task submission, thread management, monitoring, and so on.
We can based on it for the expansion of business, in order to realize we need other functions, such as the implementation timing task class ScheduledThreadPoolExecutor is inherited from ThreadPoolExecutor. Of course, that’s not the focus of this article, so let’s do a quick source code analysis.
First, let’s look at a few concepts and processes in thread pool implementation.
Let’s review a few ways to submit tasks:
public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }Copy the code
This parameter is not used in the new Thread(Runnable).start(). This parameter is not used to start the Thread, which refers to the task. Tasks do things defined in the run() method or the Call () method in Callable.
Beginners tend to confuse this because Runnable shows up all over the place, often wrapping one Runnable into another. Think of it as a Task interface that has a run() method in it (I think the author just doesn’t want to define a Runnable interface for that, because Callable doesn’t work).
Moving on, I drew a simple diagram to illustrate some of the main components in a thread pool:
Of course, the figure above does not consider whether the queue is bounded or not. What if the queue is full when the task is submitted? When is a new thread created? What if the thread pool is full when you submit a task? How do I turn off idle threads? We will address each of these questions below.
We often use the Executors this tools to quickly construct a thread pool, for starters, this is very useful tools, the developer does not need to pay attention to too much detail, just know that you need a thread pool, only it is ok to provide the required parameters, other parameters are provided by the author of the default values.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Copy the code
Regardless of the difference, they all lead to this constructor:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); / / this a few parameters are must be some if (workQueue = = null | | threadFactory = = null | | handler = = null) throw new NullPointerException (); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Copy the code
Basically, the above constructor lists the properties that we need to care about most. Here are some of the properties that appear in the constructor one by one:
-
corePoolSize
The number of core threads, do not be stingy words, anyway, first remember to have such an attribute.
-
maximumPoolSize
Maximum number of threads, the maximum number of threads allowed to be created by the thread pool.
-
workQueue
A task queue, an implementation of the BlockingQueue interface (ArrayBlockingQueue and LinkedBlockingQueue are commonly used).
-
keepAliveTime
Idle thread keepalive time. If the idle time exceeds this value, a thread can be shut down. Note that this value does not apply to all threads. If the number of threads in the thread pool is less than or equal to corePoolSize, these threads will not be shut down because they have been idle for too long. Of course, Threads within the core thread count can also be reclaimed by calling allowCoreThreadTimeOut(true).
-
threadFactory
For generating threads, we can usually use the default. In general, we can use it to make our Thread names more readable, such as message-thread-1, message-thread-2, and so on.
-
Handler:
This specifies the policy to take when the thread pool is full and new tasks are submitted. You can throw an exception, reject it and return it, or you can implement your own interface to implement your own logic, but more on that later.
In addition to the above attributes, let’s look at other important attributes.
Doug Lea uses a 32-bit integer to store the state of the thread pool and the number of threads currently in the pool. The top three bits are used to store the state of the thread pool, and the bottom 29 bits are used to store the number of threads. As we know, the Java language is unified in integer coding, are using the form of complement, the following is a simple shift operation and Boolean operation, are very simple.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); COUNT_BITS is set to 29(32-3), which means that the first three bits are used to store thread state and the last 29 bits are used to store thread count. Private static final int COUNT_BITS = integer.size - 3; / / 11111111111111111111111111111/000 / here is 29 1, that is the maximum number of threads thread pool is 2 ^ 29-1 = 536870911 / / and the actual situation of the computer for where we are today, Private static final int CAPACITY = (1 << COUNT_BITS) -1; private static final int CAPACITY = (1 << COUNT_BITS) -1; // The state of the thread pool is stored in the top three bits. 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; Private static int workerCountOf(int c) {return c & CAPACITY; private static int workerCountOf(int c); } private static int ctlOf(int rs, int wc) { return rs | wc; } /* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */ private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; }Copy the code
The above is a simple bit operation on an integer, several operations will always appear in the later source code, so the reader is best to remember the method name and the function it represents, look at the source code when there is no need to go back and forth.
Here, we introduce the transitions of states and state changes in the thread pool:
- RUNNING: This is the normal state: accepting new tasks, processing tasks in the waiting queue
- SHUTDOWN: No new task submissions are accepted, but tasks in the wait queue continue to be processed
- STOP: does not accept new task submissions, stops processing tasks in the waiting queue, and interrupts the thread executing the task
- TIDYING: All tasks destroyed, workCount 0. Thread pool state terminated() executes hook method when it transitions to TIDYING state.
- TERMINATED: The state of the thread pool will change to this after the TERMINATED () method
RUNNING is defined as -1, SHUTDOWN is defined as 0, and other values are larger than 0. Therefore, tasks cannot be submitted when they are equal to 0. If they are greater than 0, even ongoing tasks need to be interrupted.
After reading the introduction of these states, readers can generally guess that nine out of ten states have changed, and the transformation process of each state is as follows:
- RUNNING -> SHUTDOWN: The most important state transition occurs when SHUTDOWN () is called
- (RUNNING or SHUTDOWN) -> STOP: This state transition occurs when shutdownNow() is called, to make clear the difference between SHUTDOWN () and shutdownNow()
- SHUTDOWN -> TIDYING: Switch from SHUTDOWN to TIDYING when both the task queue and the thread pool are empty
- STOP -> TIDYING: This transition occurs when the task queue is empty
- TIDYING -> TERMINATED: this is stated earlier when the TERMINATED () method ends
Remember the core ones, especially the first one and the second one.
In addition, we also need to look at an internal class Worker, because Doug Lea wraps the threads in the thread pool as workers, which translates to the threads doing tasks in the thread pool. So here we know that the task is a Runnable (internally called task or command) and the thread is the Worker.
Worker here and use the abstract class AbstractQueuedSynchronizer. As an aside, AQS is really everywhere in concurrency, and it’s so easy to use that you can write a small amount of code to achieve the synchronization you need (please refer to my previous articles for those interested in the AQS source code).
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // This is the real Thread. The task depends on you. // Runnable is a task. Why firstTask? If you specify the first task that the thread needs to execute when it is created, the first task will be stored here (the thread does not execute only this task). Get the task from the BlockingQueue (getTask method) Runnable firstTask; Volatile Long completedTasks; volatile long completedTasks; volatile long completedTasks; // Null Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; Call ThreadFactory to create a newThread this.thread = getThreadFactory().newthread (this); } public void run() {runWorker(this); }... }}}}}}}}}}}Copy the code
The previous part is long but simple. With that in mind, we can now look at the execute method of ThreadPoolExecutor. As stated in the previous source code analysis, all methods ultimately depend on the execute method:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); int c = ctl.get(); // If the current number of threads is less than the number of core threads, then add a worker to perform the task directly, // create a new thread, If (workerCountOf(c) < corePoolSize) {// Add the task successfully, then it is finished. Submit the task, the thread pool has accepted the task, and the method can return // the result of the execution will be wrapped in FutureTask at that time. If (addWorker(command, true)) return; c = ctl.get(); } // If the thread pool is in the RUNNING state, the number of threads is equal to or greater than the number of core threads. If (isRunning(c) &&workqueue.offer (command)) {/* If (isRunning(c) &&workqueue.offer (command)) {/* If (isRunning(c) &&workqueue.offer (command)) {/* If (isRunning(c) &&workqueue.offer (command)) { Do we need to start a new thread * because the number of threads at [0, corePoolSize) is unconditional * If the number of threads is greater than or equal to corePoolSize, then add the task to the queue, */ int recheck = ctl.get(); IsRunning (recheck) && remove(command)) reject(command); // If the pool is still RUNNING and the number of threads is 0, start a new thread. Else if (workerCountOf(recheck) == 0) addWorker(null, false); } // If the workQueue is full, enter the branch and create a new worker bounded by maximumPoolSize. // If this fails, the current number of threads has reached maximumPoolSize. Else if (! AddWorker (command, false)) reject(command);}Copy the code
Thread creation error: If the number of threads is less than corePoolSize, create a thread. If the number of threads is between [corePoolSize, maximumPoolSize], then create threads or reuse idle threads. KeepAliveTime is valid for this interval.
From the branches above, we can see that the above statement is false.
It’s not going to take a while to digest all of this, so let’s go ahead and look back a few times.
The addWorker(Runnable firstTask, Boolean core) method is very important. Let’s see how it creates a new thread:
// The first argument is the task to be submitted to the thread for execution, which can be null as mentioned earlier. The second argument is true, which means that the number of core threads corePoolSize is used as the threshold for thread creation. // If the number of threads in the thread pool has reached corePoolSize, the request to create a thread cannot be responded to this time. Private Boolean addWorker(Runnable firstTask, Boolean core) {Retry: for (;) { int c = ctl.get(); int rs = runStateOf(c); If the thread pool is closed and one of the following conditions is met, no new worker is created: // 1. The thread pool state is greater than SHUTDOWN, which is TERMINATED // 2.firstTask! = null // 3.workqueue.isempty () // when the thread pool is SHUTDOWN, it is not allowed to submit tasks, but existing tasks continue to execute. // when the thread pool is SHUTDOWN, it is not allowed to submit tasks, and the task in progress is interrupted. If the thread pool is SHUTDOWN, but firstTask is null, and workQueue is not empty, then it is allowed to create the worker's if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // If the thread is created successfully, the thread is ready to execute the task. That there are other threads are trying to create a thread in the thread pool the if (compareAndIncrementWorkerCount (c)) break retry. CTL c = ctl.get(); If (runStateOf(c)!) {// If (runStateOf(c)!) {// If (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop}} /* * At this point, we consider that we are ready to start creating threads to execute the task. * Since the check is complete, what will happen later is another matter. WorkerStarted = false; workerStarted = false; Boolean workerAdded = false; Boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // Pass firstTask to the worker constructor w = new worker (firstTask); The worker constructor calls ThreadFactory to create a new Thread. Final Thread t = w.htread; if (t ! Mainlock. lock(); mainlock. lock(); mainlock. lock(); mainlock. lock(); try { int c = ctl.get(); int rs = runStateOf(c); // If this is equal to SHUTDOWN, no new task is accepted. But will continue to perform tasks in the waiting queue if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {/ / worker thread inside the if is not already started (t.isAlive()) throw new IllegalThreadStateException(); Worker. add(w); // Add worker. add(w); int s = workers.size(); // largestPoolSize is used to record the maximum number of workers. If (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); If (workerAdded) {t.start(); workerStarted = true; }} finally {// If the thread is not started, you need to do some cleaning, such as adding 1 to workCount, subtract it if (! workerStarted) addWorkerFailed(w); } // Return workerStarted; }Copy the code
AddWorkFailed:
// workCount minus 1 private void addWorkerFailed(worker w) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w ! = null) workers.remove(w); decrementWorkerCount(); // rechecks for termination, in case the existence of this worker was holding up termination tryTerminate(); } finally { mainLock.unlock(); }}Copy the code
Turn around and keep going down. When a thread starts, its run method calls the runWorker method:
Public void run() {runWorker(this); }Copy the code
Moving on to the runWorker method:
// This method is called when the worker thread starts, where a while loop is used to continuously fetch tasks from the wait queue and execute them. Final void runWorker(Worker w) {// Thread wt = thread.currentThread (); // The thread's first task (if any) Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; Try {// loop getTask to get the task while (task! = null || (task = getTask()) ! = null) { w.lock(); // If the thread pool state is greater than or equal to STOP, It means that the Thread will interrupt the if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) &&! wt.isInterrupted()) wt.interrupt(); Try {// This is a hook method that is left to subclasses that need to implement beforeExecute(wt, task); Throwable thrown = null; Task.run (); task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) {// Error thrown = x; throw new Error(x); } finally {// Is also a hook method that takes tasks and exceptions as arguments, leaving afterExecute(task, thrown) to subclasses that need it; }} finally {// empty task, prepare getTask get the next task task = null; W.com pletedTasks++; // release worker's exclusive lock w.nlock (); } } completedAbruptly = false; } finally {// If you get to this point, you need to execute thread closure: // 1. GetTask returns null, that is, the worker's mission is over, execute shutdown // 2. // The workCount is not being processed. // The workCount is not being processed. ProcessWorkerExit (w, completedAbruptly); processWorkerExit(w, completedAbruptly) }}Copy the code
Let’s take a look at how getTask() gets the task. It’s really nice, each line is simple, but all of the scenarios are figured out when you put them together:
There are three possibilities for this method: // 1. Block until the task returns. We know that threads within corePoolSize are not collected by default. // They wait for the task // 2. Exit due to timeout. When keepAliveTime is in effect, that is, if there is no work for this amount of time, then it should be closed // 3. This method must return NULL if the following conditions occur: // - the thread pool is SHUTDOWN and the workQueue is empty. Private Runnable getTask() {Boolean timedOut = false; private Runnable getTask() {Boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); / / two possible / / 1 rs = = SHUTDOWN && workQueue. IsEmpty () / / 2. Rs > = STOP if (rs > = SHUTDOWN && (rs > = STOP | | Workqueue.isempty ())) {// CAS to decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); / / allow the core number of threads in thread recycling, or the current thread count more than the core number of threads, and so is likely to close timeouts timed = allowCoreThreadTimeOut | | wc > corePoolSize; / / break here, so as not to perform after an if (compareAndDecrementWorkerCount (c)) / / two if watching: If wc > maximumPoolSize specifies the number of threads on the thread, then null is returned. If wc > maximumPoolSize specifies the number of threads on the thread. In other words, returning NULL means closing the thread. If (wc <= maximumPoolSize &&! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); / / Re - read CTL / / compareAndDecrementWorkerCount (c) failure, the number of threads in thread pool changed the if (runStateOf (c)! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop} // wc <= maximumPoolSize And no timeout try {// Obtain a task from the workQueue Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) {if the worker is interrupted, the solution is to retry. // If the developer makes maximumPoolSize smaller than the current number of workers, it means that the additional threads will be closed. If you re-enter the for loop, some threads will return null timedOut = false; }}}Copy the code
Now that I’ve basically covered the whole process, the reader should go back to the execute(Runnable Command) method at this point and look at the branches. I’ll post the code here:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); int c = ctl.get(); // If the current number of threads is less than the number of core threads, then add a worker to perform the task directly, // create a new thread, If (workerCountOf(c) < corePoolSize) {// Add the task successfully, then it is finished. Submit the task, the thread pool has accepted the task, and the method can return // the result of the execution will be wrapped in FutureTask at that time. If (addWorker(command, true)) return; c = ctl.get(); } // If the thread pool is in the RUNNING state, the number of threads is equal to or greater than the number of core threads. If (isRunning(c) &&workqueue.offer (command)) {/* If (isRunning(c) &&workqueue.offer (command)) {/* If (isRunning(c) &&workqueue.offer (command)) {/* If (isRunning(c) &&workqueue.offer (command)) { Do we need to start a new thread * because the number of threads at [0, corePoolSize) is unconditional * If the number of threads is greater than or equal to corePoolSize, then add the task to the queue, */ int recheck = ctl.get(); IsRunning (recheck) && remove(command)) reject(command); // If the pool is still RUNNING and the number of threads is 0, start a new thread. Else if (workerCountOf(recheck) == 0) addWorker(null, false); } // If the workQueue is full, enter the branch and create a new worker bounded by maximumPoolSize. // If this fails, the current number of threads has reached maximumPoolSize. Else if (! AddWorker (command, false)) reject(command);}Copy the code
In two of these cases, reject(command) is called to process the task, because the thread pool would normally not accept the task at this point, so we need to execute our reject policy. Next, let’s talk about the rejection policy in ThreadPoolExecutor.
The final void reject (Runnable command) {/ / execution refuse strategy handler. RejectedExecution (command, this); }Copy the code
Handler we need to pass in this parameter when constructing the thread pool, which is an instance of RejectedExecutionHandler.
RejectedExecutionHandler There are four defined implementation classes in ThreadPoolExecutor that we can use directly. Of course, we can implement our own policies, but this is generally not necessary.
// As long as the thread pool is not closed, the thread submitting the task will execute the task itself. public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { r.run(); }}} / / anyway, direct selling RejectedExecutionException abnormal / / this is the default policy, if we construct the thread pool does not pass the corresponding handler, Public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() {} public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }} // Do nothing, Public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() {} public void RejectedExecution (Runnable r, ThreadPoolExecutor e) {}} Public Static Class DiscardOldestPolicy implements RejectedExecutionHandler {public discardoldoldestPolicy implements RejectedExecutionHandler DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code
At this point, the source code for ThreadPoolExecutor is analyzed. The source code for ThreadPoolExecutor is relatively simple in terms of ease of source code, but it just needs to be taken a long, hard look.
Executors
Executors class: * * * class * * class = true * / * class; all methods are static.
- Generate a fixed size thread pool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Copy the code
The maximum number of threads is set to equal the number of core threads, keepAliveTime is set to 0 (because it is useless here, even if it is not 0, the thread pool does not reclaim threads in corePoolSize by default), and the task queue uses LinkedBlockingQueue, an unbounded queue.
Process analysis: At first, a worker is created for each task submitted. When the number of workers reaches nThreads, no new threads are created. Instead, the task is submitted to LinkedBlockingQueue, and the number of threads remains nThreads.
- Create a fixed thread pool with only one thread, this is easier, as above, just set the number of threads to 1:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Copy the code
- Generate a thread pool that creates new threads as needed and can reuse previously created threads (if the thread has no current task) :
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Copy the code
The number of core threads is 0, the maximum number of threads is integer. MAX_VALUE, the keepAliveTime is 60 seconds, and the task queue is SynchronousQueue.
This thread pool has better performance in cases where tasks can be completed relatively quickly. If a thread is idle for 60 seconds without a task, the thread is closed and removed from the thread pool. So it doesn’t matter if the thread pool is idle for a long time, because with all threads being shut down, the entire thread pool doesn’t consume any system resources.
Process analysis: I pasted the body of the execute method just to make it clear. Since corePoolSize is 0, the offer method must return false when the task is submitted directly to the SynchronousQueue. Since there is no worker to receive this task at this time, the last branch will be entered to create the first worker. The task is submitted later, depending on whether there are free threads to receive the task, and if so, it goes to the second if block, otherwise it goes to the last else if branch as in the first task.
int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; c = ctl.get(); } // offer returns true if an idle thread is available to receive the task, If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (! addWorker(command, false)) reject(command);Copy the code
SynchronousQueue is a special BlockingQueue that does not store any elements on its own. It has a virtual queue (or stack) for both read and write operations. If the current queue stores threads of the same schema as the current operation, the current operation is queued to wait. If the mode is reversed, the pairing succeeds and the head node is taken from the current queue. For more information, see my other BlockingQueue article.
conclusion
I always hate to write a summary, because I have written all the things I need to express in the body of the paper. It is not really clear to write a short summary. The summary part of this article is written for the readers who are preparing for the interview, hoping to help the interviewees or those who do not have enough time to read the whole article.
-
What are the key attributes of a Java thread pool?
CorePoolSize, maximumPoolSize, workQueue, keepAliveTime, rejectedExecutionHandler
Threads between corePoolSize and maximumPoolSize are recycled, but corePoolSize threads can also be recycled by setting allowCoreThreadTimeOut(true).
WorkQueue is used to hold tasks. When adding tasks, if the current number of threads exceeds corePoolSize, the task is inserted into the queue, and the threads in the thread pool are responsible for pulling the task from the queue.
KeepAliveTime is used to set the idle time. If the number of threads exceeds corePoolSize and some threads exceed this idle time, they are shut down
RejectedExecutionHandler used to handle the case when a thread pool can’t perform this task, the default has thrown RejectedExecutionException anomaly, ignore the task and submit task threads to use to perform this task and will delete the queue waiting for the longest tasks, The task is then committed to these four policies, which by default throw an exception.
-
What about thread creation timing in a thread pool?
- If the current number of threads is less than corePoolSize, a new thread is created when the task is submitted and this thread executes the task.
- If the current number of threads has reached corePoolSize, the submitted task is added to the queue and the thread in the thread pool is waiting to fetch the task from the queue.
- If the queue is full, a new thread is created to execute the task, ensuring that the number of threads in the pool does not exceed maximumPoolSize. If the number of threads in the pool exceeds maximumPoolSize, a reject policy is implemented.
* Note: If you set the queue to unbounded, the number of threads will not actually grow once corePoolSize is reached.
-
Executors. NewFixedThreadPool (…). And Executors. NewCachedThreadPool () constructs out what is the difference between the thread pool?
Details too long, slide up a bit, and complete the description in the section of Executors.
-
What can I do if an exception occurs during task execution?
If a task execution fails, the thread executing the task is closed, rather than continuing to receive other tasks. A new thread is then started to replace it.
-
When will the rejection policy be implemented?
- The number of workers reaches corePoolSize (the task needs to enter the task queue at this time), the task is enqueued successfully, and the thread pool is closed at the same time, and the closing of the thread pool does not remove the task from the queue, then the rejection policy is implemented. This is a very borderline problem, queuing and closing the pool to execute simultaneously, and the reader takes a closer look at how the execute method goes into the first reject(command).
- If the number of workers is greater than or equal to corePoolSize, the task will be added to the task queue. If the queue is full, the task will fail to be added to the task queue.
Because this article is too long, I don’t say how the results are obtained or how to turn off the thread pool. I’ll leave that to the reader.
This article is a bit long. If you find anything wrong or need to add something, please feel free to mention it. Thank you.
(Full text)