preface
In the last article, WE covered what a thread pool is and its basic use in Java Thread pools: Theory. (The original writing idea is to use the article, but after the netizen’s suggestion, I feel changed to theory will be more appropriate). This article will delve into the source code of ThreadPoolExecutor, mainly to introduce the implementation of the source code of ThreadPoolExecutor, to get a clearer understanding of ThreadPoolExecutor.
The source code for ThreadPoolExecutor is relatively easy to understand, with nothing particularly hard to read. The core source logic of ThreadPoolExecutor will be easy for readers who are not accustomed to reading source code.
The source code for this article is JDK 8, and the class is jdK1.5. The source code for ThreadPoolExecutor has not changed since 1.5.
Thread pool family
The thread pool inheritance structure in Java is shown below.
- The top-level interface Executor represents an Executor that has only one interface:
execute()
Is displayed, indicating that tasks can be executed - ExecutorService expands on the Executor base with additional execution methods, such as
submit()
shutdown()
And so on, representing a task execution service. - AbstarctExecutorService is an abstract class that implements some of the core ExecutorService methods, such as Submit
- ThreadPoolExecutor is the core class, the thread pool, which inherits the abstract class AbstarctExecutorService
- In addition, ScheduledExecutorService interface, which represents an executor service can be executed according to the specified time or period, internal definition such as
schedule()
And so on to perform tasks - ScheduledThreadPoolExecutor ScheduledExecutorService interface is achieved, and at the same time inherited in ThreadPoolExecutor, internal thread pool associated logic using the ThreadPoolExecutor, On this basis, the function features of delay and periodic execution are extended
ScheduledThreadPoolExecutor is relatively less. For delayed tasks, we are more familiar with Android schemes: Handler; Periodic tasks are used very rarely. The android background is very restrictive now, basically once you quit the application, the application process is easy to be killed by the system. ScheduledThreadPoolExecutor isn’t completely useless, of course, such as desktop widgets need to set up refreshed regularly, so he can come in handy.
Therefore, the source code for this article focuses on ThreadPoolExecutor. Before reading the source code, let’s take a look at the internal structure of ThreadPoolExecutor and its key roles.
The internal structure
Before reading the source code, let’s take a look at the entire source code structure of ThreadPoolExecutor to get a sense of what it’s all about. Let’s take a look at the internal structure of ThreadPoolExecutor:
- ThreadPoolExecutor has three key roles inside it: blocking queues, threads, and RejectExecutionHandler, whose roles are described in more detail in this article.
- In ThreadPoolExecutor, a thread corresponds to a worker object, a worker, very figuratively. There is an independent thread inside each worker, which will constantly block the queue to obtain tasks for execution, that is, call the blocking queue
poll
ortake
Methods, they’re different and we’ll talk about that later. If there are no more tasks in the queue, then it will block here. - A workQueue is a blocking queue where tasks are placed to wait for execution by workers when the core thread is full
- RejectExecutionHandler is itself an interface, and ThreadPoolExecutor has an interface object that can be used to invoke methods when a task cannot be executed. ThreadPoolExecutor provides four implementations of this interface that you can either use directly or inherit to implement your own custom logic. The RejectExecutionHandler object can be passed when constructing the thread pool.
- The core method of ThreadPoolExecutor is execute, which can choose a different execution scheme or reject execution depending on the situation.
Now that we know the internal structure of ThreadPoolExecutor, let’s begin reading the fucking code.
Source code analysis
Internal key attributes
There are many variables inside ThreadPoolExecutor, and the information they contain is very important.
The state of ThreadPoolExecutor and the number of threads are combined in the same int variable, similar to MeasureSpec in the View measure. His three bits represent the state of the thread pool, and the lower 29 bits represent the number of threads in the pool, as follows:
The AtomicInteger object can implement thread-safe modification using CAS, which contains thread pool state and thread count information
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29, the number of bytes (for int length 32) representing the number of threads
private static final int COUNT_BITS = Integer.SIZE - 3;
Ctl&COUNT _MASK specifies the state of the thread pool. The state of the thread pool can be obtained by ctl&COUNT_MASK
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
Copy the code
There are five thread pool states:
- Running: The thread pool is running after it is created
- Shutdown: the thread pool is shutdown after the shutdown method is called. This state stops receiving any tasks and automatically terminates the thread pool when the tasks in the blocking queue are completed
- Stop stop: The thread pool is stopped after the shutdownNow method is called. The difference with shutdown is that the thread pool in this state does not execute the remaining tasks in the queue
- Tidying: After the thread pool stops, enter the tidying state and execute
terminated()
Method, and enter the terminated state - Terminated: There are no threads executing tasks in the thread pool. The thread pool terminated completely.
In the source code, these states correspond to:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Copy the code
The above bit operation is not intuitive. After transformation, it is as follows:
private static final int RUNNING = 111 00000 00000000 00000000 00000000;
private static final int SHUTDOWN = 000 00000 00000000 00000000 00000000;
private static final int STOP = 001 00000 00000000 00000000 00000000;
private static final int TIDYING = 010 00000 00000000 00000000 00000000;
private static final int TERMINATED = 011 00000 00000000 00000000 00000000;
Copy the code
It can be seen that all the other states are positive except running, which is negative, and the further the state is, the higher the value is. Ctl&COUNT _MASK > SHUTDOWN to test whether the state is in stop, tidying, or terminated. There will be many such judgments in the subsequent source code, for example:
// The thread pool status is determined
if(runStateAtLeast(ctl,SHUTDOWN)) {
...
}
// Perform logic to determine the size of two numbers directly
private static boolean runStateAtLeast(int c, int s) {
returnc >= s; } ps: COUNT_MASK is not used here. Because the state is at a high level, the low value does not affect the judgment of the high level. Of course, if we want to determine equality, we still need to use the mask COUNT_MASK.Copy the code
Next are the three key role objects inside ThreadPoolExecutor:
// block the queue
private final BlockingQueue<Runnable> workQueue;
// Store the hashSet of the worker, where the worker is stored after it is created
private final HashSet<Worker> workers = new HashSet<>();
The default implementation of RejectedExecutionHandler is AbortPolicy
private volatile RejectedExecutionHandler handler;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
Copy the code
Lock objects for internal use:
// Here are two locks. ThreadPoolExecutor does not use the Synchronize keyword internally to maintain synchronization
// Use Lock instead; Synchronize is a lock at the application layer, whereas Synchronize is a lock at the JVM layer
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
Copy the code
Finally, the internal configuration of some parameters, have been introduced before, the source posted to review:
// The maximum number of threads reached in the thread pool history
private int largestPoolSize;
// Number of tasks completed by the thread pool.
// This number is not updated in real time. When obtaining the number of tasks completed by the thread pool, the tasks completed by each worker need to be counted and added up
// When a worker is destroyed, the number of his tasks will be added to this data
private long completedTaskCount;
// Thread factory, used to create threads
private volatile ThreadFactory threadFactory;
// The amount of time stored by the idle thread
private volatile long keepAliveTime;
// Whether to allow core threads to be reclaimed
private volatile boolean allowCoreThreadTimeOut;
// The number of core threads
private volatile int corePoolSize;
// The total number of threads
private volatile int maximumPoolSize;
Copy the code
Really, Sir? Source code has not seen the soul, the whole out of such boring variables? Hey, don’t worry, source code analysis is coming. These variables will be used throughout the source process, so having an impression of them will make reading the source code much easier.
Key method: execute()
The main task of this method is to select the execution strategy of the task based on the current state of the thread pool. The core logic of this method is as follows:
-
When the number of threads does not reach the number of core threads, a core thread is created to execute the task
public void execute(Runnable command) { // Empty tasks cannot be passed in if (command == null) throw new NullPointerException(); // Get the CTL variable that combines the state with the number of threads int c = ctl.get(); // Determine if the number of core threads exceeds the limit, otherwise create a core thread to execute the task if (workerCountOf(c) < corePoolSize) { The addWorker method creates a worker, that is, a thread, with the argument true indicating that it is a core thread // If added successfully, return directly // Otherwise, it means that other workers have been added in the middle, resulting in exceeding the number of core threads; Or the thread pool is closed or something else // Go to the next step if (addWorker(command, true)) return; c = ctl.get(); }... }Copy the code
-
When the number of threads reaches the core number, new tasks are put into a wait queue to be executed
-
When the wait queue is full, a non-core thread is created to execute the task if the number of threads does not reach the upper limit of total threads
-
When the number of threads reaches the total thread limit, the new task is processed by the policy rejector and cannot be executed by the thread pool.
public void execute(Runnable command) {...// If the thread pool is still running, try adding tasks to the queue if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // Check again if the thread pool is closed, then remove the task from the queue // If the task is successfully removed, the task is rejected // The thread pool is closed during queue insertion if (! isRunning(recheck) && remove(command)) reject(command); // Otherwise check if the number of threads is 0 again, if so, create a non-main thread worker with no tasks // If the core thread is 0 and the task is specified as null, the worker will fetch the task from the queue to execute it The thread pool has at least one thread to execute tasks in the queue else if (workerCountOf(recheck) == 0) addWorker(null.false); } // If adding to the queue above fails, try creating a non-core thread to execute the task // If the creation fails, the task is rejected else if(! addWorker(command,false)) reject(command); } Copy the code
The source code is also designed to two key methods: addWorker to create a new worker, that is, to create a thread; Reject REJECT a task. The latter is easier so let’s look at it first.
Reject a task: reject()
// Reject the task and call rejectedExecutionHandler to handle it
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
Copy the code
There are four default implementation classes. Let’s look at them one by one:
-
AbortPolicy is the default implementation, throws a RejectedExecutionException exception:
public static class AbortPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from "+ e.toString()); }}Copy the code
-
The simplest DiscardPolicy is to do nothing and simply discard the task. (This is the irresponsible behavior of a very deceitful and womanizing man. We can’t copy him, so don’t use it.)
public static class DiscardPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code
-
DiscardOldestPolicy removes a task from the queue header and then executes itself again.
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code
-
CallerRunsPolicy is the most powerful. He simply executes the run method on his own thread instead of relying on the thread pool.
public static class CallerRunsPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if(! e.isShutdown()) { r.run(); }}}Copy the code
The above four ThreadPoolExecutor classes are static inner classes that we can use directly when creating ThreadPoolExecutor. You can also inherit interfaces to implement your own logic. Which one to choose depends on the actual business requirements.
So let’s look at the method to create the worker.
Create worker: addWorker()
The purpose of the method is simple: create a worker. As mentioned above, a thread is created inside worker, and each worker represents a thread, which is very similar to Looper in Android. Looper’s loop() method keeps getting messages from MessageQueue, while Worker’s run() method keeps blocking the queue to get tasks, which we’ll talk about later.
The logic of the addWorker() method is generally divided into two parts:
-
Check whether the thread status and number meet the following conditions:
// The first argument is the first task to be executed by the created thread. It can be null, indicating that a thread is initialized // The second argument indicates whether it is a core thread private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // Remember when we talked about thread pool state control? // runStateAtLeast(c, SHUTDOWN) Indicates that the state is at least SHUTDOWN // If the thread pool is stopped or above, the worker will not be created // If the thread pool state is in shutdown state, if the queue is not empty or task! =null, the worker will also be created if(runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask ! =null || workQueue.isEmpty())) // Otherwise return false to deny worker creation return false; // CAS polling is used to increase the total number of threads by 1 for (;;) { // Check if the thread limit is exceeded // The thread is a core thread or a non-core thread if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // use CAS to increment the CTL variable, indicating worker+1 // If the CAS fails, a race has occurred, so try again if (compareAndIncrementWorkerCount(c)) // Success breaks out of the outermost loop break retry; // If the CTL is changed during this period, get the CTL and try again c = ctl.get(); // if the thread pool is shutdown, repeat the outermost loop to determine whether the state can create the worker if (runStateAtLeast(c, SHUTDOWN)) // Continue the outermost loop continueretry; }}// Create worker logic. }Copy the code
I wonder if readers are confused about the use of retry in source code, since it is rarely used. Its function is to mark a loop so that our inner loop can jump to any outer loop. Retry is only a name. Change it to repeat: or even a:. It is what it is: a circular mark.
-
Create the worker object and call the start() method of its internal thread to start the thread:
private boolean addWorker(Runnable firstTask, boolean core) {...boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // Create a new worker // A thread is created internally w = new Worker(firstTask); final Thread t = w.thread; if(t ! =null) { // Get the global lock and lock it final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // After the lock is acquired, the status needs to be checked again int c = ctl.get(); // Only the running state or shutDown&&task==null will be executed if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { // If the thread is not newly created, an exception is thrown if(t.getState() ! = Thread.State.NEW)throw new IllegalThreadStateException(); // Add to workerSet workers.add(w); workerAdded = true; int s = workers.size(); // Tracks the maximum number of threads reached by the thread pool if(s > largestPoolSize) largestPoolSize = s; }}finally { / / releases the lock mainLock.unlock(); } // If added successfully, the thread is started if (workerAdded) { t.start(); workerStarted = true; }}}finally { // If the thread is not started, the worker fails to be added, and the thread pool may be closed during the process of adding if (! workerStarted) // Remove worker from workerSet addWorkerFailed(w); } return workerStarted; } Copy the code
After the previous two steps, if no exception occurs, the worker is successfully created. AddWorkerFailed (w) addWorkerFailed(W)
// Failed to add worker
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
/ / lock
mainLock.lock();
try {
if(w ! =null)
workers.remove(w);
// The total number of threads will be -1
decrementWorkerCount();
// Try to set the thread pool state to terminad
// it is possible that the thread pool was shutdown during the process of adding workers
// Set the state to terminad if no task is running
// This method will be discussed later
tryTerminate();
} finally{ mainLock.unlock(); }}Copy the code
So at this point, we’re done analyzing some of the call methods in the execute() method. Blocking queue-related methods are beyond the scope of this article and will not be expanded. So here’s another question: How does a worker work? There is a thread inside worker. When the thread starts, the run method of the runnable object that initializes the thread will be called. What is the runnable object? I went straight to the worker.
I’m a Worker
First we look at his construction method:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Copy the code
The source code is simple, set the incoming task to the internal variable firstTask, and then pass itself to the thread factory to create a thread. So when the thread starts, the Worker’s own run method is called, so we see the Worker’s run() method.
public void run(a) {
runWorker(this);
}
Copy the code
Worker is an inner class of ThreadPoolExecutor, where the ThreadPoolExecutor method is called directly: runWorker() to start execution. So what we’re going to do is look at this method.
Start worker: runWorker()
This method is the method that the worker executes, and it keeps executing until the thread is destroyed, similar to Handler’s Looper, which keeps going to the queue for messages to execute:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// Get the task set during worker initialization, which can be null. If null, only the thread is created
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
// The function of this parameter is explained later, and needs to be combined with other source code
boolean completedAbruptly = true;
try {
// If its task is not null, its task will be executed
// Otherwise getTask is called to the queue to get a task to execute
// The getTask will eventually call the queue's methods to get the task
// If the queue is empty, its fetch method will block, which will be blocked, more on that later
while(task ! =null|| (task = getTask()) ! =null) {
try{
// Execute the task. }finally {
// Set task to null when the task is completed
task = null;
// Total number of tasks +1
w.completedTasks++;
/ / releases the lockw.unlock(); }}// Set it to false, remember it first
completedAbruptly = false;
} finally {
// If the worker exits, subsequent cleanup needs to be performedprocessWorkerExit(w, completedAbruptly); }}Copy the code
As you can see, the overall framework of this method is relatively simple, and the core is that while (Task! = null || (task = getTask()) ! In this loop, if getTask() returns null, it is time for the thread to terminate, just like the Handler mechanism.
The above source code omits the specific execution of the task logic, his logic is also very simple: judge the state + run the task. Let’s take a look:
final void runWorker(Worker w) {
...;
try {
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If the thread pool is set to stop, ensure that the thread is interrupted
// If the thread pool is not stopped, ensure that the thread is not interrupted
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
// Callback method, which is an empty implementation
beforeExecute(wt, task);
try {
// Run the task
task.run();
// Callback method, also an empty implementation
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throwex; }}... } completedAbruptly =false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
Once a task is acquired, it executes the run method of that task and then goes back to retrieve a new task.
We’ll see a lot of empty implementation methods that are implemented by subclasses, sort of like the Activity lifecycle, and subclasses need to override those methods to do some work in specific cases. Of course, the general use is that you don’t need to override these methods. Next you need to see how getTask() gets the task.
GetTask: getTask()
The content of this method can be divided into two parts: determining the state of the current thread pool + blocking a task from the queue.
The first part is to determine the status of the current thread pool. If it is in the closed state, null is directly returned to terminate the worker; otherwise, it is necessary to determine whether the current thread times out or exceeds the maximum number of threads:
private Runnable getTask(a) {
boolean timedOut = false;
// CAS is used internally, so there needs to be a loop to keep trying
for (;;) {
int c = ctl.get();
// If shutdown is in the state and the queue is empty, or stop is in the state, return null
// This is consistent with the different behavior of different thread pool states discussed earlier
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
// Let's make the total number of threads -1, remember that, we'll talk about that later
decrementWorkerCount();
return null;
}
// Get the current total number of threads
int wc = workerCountOf(c);
// Determine whether the thread can be destroyed if it is idle: the core thread is allowed to be null or the current thread exceeds the number of core threads
// There is no distinction between core and non-core threads, only the number of threads in the core range and non-core range
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// The number of threads has exceeded the maximum or timed out;
// It is possible that the user changed the data with setMaximumPoolSize to exceed the maximum number of threads
// You must also ensure that the current thread number is greater than 1 or that there are no more tasks in the queue
// This ensures that when a task exists, at least one thread must be executing it
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// Use CAS to try to make the current thread count -1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// Get the task logic. }}Copy the code
The second part is to take a task and execute it. Getting tasks uses the blocking queue method, which blocks if there are no tasks in the queue:
private Runnable getTask(a) {
boolean timedOut = false;
// CAS is used internally, so there needs to be a loop to keep trying
for (;;) {
// Determine the thread pool state logic.try {
// Get a task
// The poll method returns null if no object is retrieved after a specified time
The take method will wait until it gets a new object unless interrupted
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
// r==null, indicating timeout, loop again
timedOut = true;
} catch (InterruptedException retry) {
// Interrupt indicates that the thread pool is closed
timedOut = false; }}}Copy the code
The key here is the poll() and take() methods of the blocking queue, both of which go to the queue to get a task; However, the poll() method blocks for a specified time and returns, while the take() method blocks indefinitely. This corresponds to a thread with a lifetime and a core thread that will not be destroyed.
Also note that timedOut = true is assigned in this section. If the assignment is true and the loop is executed again, it is intercepted and returns false, as explained in part 1 of the logic. If the thread is interrupted while waiting, the pool is closed and the state logic is reworked.
So much for the logic of execution, let’s talk about the logic of thread pool closure and worker termination.
Worker exits work: processWorkerExit
The runWorker() method was introduced earlier. The main task of this method is to get the worker to move and queue for tasks. When null is returned when the task is obtained, it means that the worker can end, and the processWorkerExit() method will be called as follows:
final void runWorker(Worker w) {...try{... }finally {
// If the worker exits, subsequent cleanup needs to be performedprocessWorkerExit(w, completedAbruptly); }}Copy the code
ProcessWorkerExit () will clean up the aftermath of the worker exit. The specific contents are as follows:
- Remove the worker and try to set the thread pool state to terminated:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If the getTask method does not return null, then the total number of threads should be -1
// I've been trying to draw your attention to this parameter
DecrementWorkerCount () is executed in the getTask() method if you are exiting under normal circumstances
// If something special happens that suddenly ends, it is not by returning null at getTask
// Abruptly means sudden, so completedAbruptly is true, which is normally set to false in runWorker methods
// What does that mean? The user's task throws an exception, at which point the thread ends abruptly, without going through the getTask method
// The total number of threads is -1
if (completedAbruptly)
decrementWorkerCount();
// Get the lock and add the total number of completed tasks to remove the worker from the set
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// Try to set the thread pool state to terminated
// This method was mentioned earlier when addWorker failed, and will be expanded latertryTerminate(); . }Copy the code
- After removing the worker, if the thread pool has not been stopped, then at least one thread of the queue task must be executed:
private void processWorkerExit(Worker w, boolean completedAbruptly) {...int c = ctl.get();
// Stop and above do not need to perform the rest of the tasks
if (runStateLessThan(c, STOP)) {
// If the thread terminates suddenly, it must be recreated
// Otherwise determine whether to keep a thread
if(! completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0&&! workQueue.isEmpty()) min =1;
if (workerCountOf(c) >= min)
return;
}
// If the number of threads <= the number of core threads, or if the core thread can be destroyed, the number of threads ==0 and the queue is not empty
// Create a thread to execute the task
addWorker(null.false); }}Copy the code
The code looks a lot, but the logic is relatively simple. A method called tryTerminate() has been mentioned before but has not been expanded, so let’s introduce it.
Try to terminate the thread pool: tryTerminate()
This method appears anywhere it is possible to put the thread pool into a terminated state. If the worker fails to be added, the thread pool may be stopped and no worker is running. The thread pool can enter the terminated state. If the worker is terminated, it may be the last worker to be destroyed, and the thread pool needs to enter the terminated state.
The state of the thread pool is tested, and terminated if required. If the state is not terminated, it does nothing and returns directly.
- First check whether the current thread pool state is terminated. If the state is running or above tidying, state transitions are definitely not required. Because running needs to enter stop state first, tidying is already ready to enter terminated state. If you are in the shutdown state and the queue is not empty, then you need to complete the tasks in the queue, so state transitions are not suitable either:
final void tryTerminate(a) {
for (;;) {
int c = ctl.get();
// If the state is running or above tidying, it returns directly without changing the state
// If the queue is in the stop state and the queue is not empty, wait for the task in the queue to complete and return directly
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
// The thread pool must be stopped
// The number of threads is not equal to zero, try to interrupt an idle worker thread
// Here he interrupts only one of the workerSet threads, and tryTerminate is called again when one of the threads stops
Workercount ==0 workerCount ==0 workerCount ==0 workerCount ==0
// This is the chain reaction.
if(workerCountOf(c) ! =0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// Set the state to terminated logic. }}Copy the code
- The thread pool must be ready to enter terminated state. The rest of the code is to set the thread pool state to terminated:
final void tryTerminate(a) {
for (;;) {
// The previous part of the logic.// Get the global lock first
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Try changing the thread pool state from Stop to tidying
// If the change fails, the state has been changed, then the outer loop will run another one
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// This method is an empty implementation and needs to be overridden by subclasses
terminated();
} finally {
// Finally set the state to terminated
ctl.set(ctlOf(TERMINATED, 0));
// Wake up all threads waiting to terminate the lock
termination.signalAll();
}
return; }}finally {
/ / releases the lock
mainLock.unlock();
}
// CAS failed to change the thread pool status}}Copy the code
When the thread pool is marked in terminated state, it is terminated completely.
That’s it. Congratulations, you have a good understanding of the source code for ThreadPoolExecutor. The remaining common API methods are submit(), shutdown(), and shutdownNow(), all of which, by the way, have very simple logic.
Shutdown the thread pool: shutdown/shutdownNow
There are two ways to turn off a thread pool:
- Shutdown: Sets the state of the thread pool to shutdown, and attempts to interrupt all idle threads, but waits for the queue to complete before terminating the thread pool.
- ShutdownNow: Sets the state of the thread pool to stop and attempts to stop all idle threads. It does not wait for tasks in the queue to complete.
Let’s take a look at each:
// Tasks in the queue will continue to be executed, but no new tasks will be added
public void shutdown(a) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// Set the state to shutdown
advanceRunState(SHUTDOWN);
// Try to interrupt all idle workers
interruptIdleWorkers();
/ / callback methods, this method is an empty, ScheduledThreadPoolExecutor rewrite the method
onShutdown();
} finally {
mainLock.unlock();
}
// Try to set the thread pool state to terminated
tryTerminate();
}
Copy the code
Take a look at another method shutdownNow:
// The remaining tasks in the queue will not be executed
// The remaining tasks are returned to the developer
public List<Runnable> shutdownNow(a) {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Check if the thread can be closed
checkShutdownAccess();
// Set the state to stop
advanceRunState(STOP);
// Try to interrupt all threads
interruptWorkers();
// Return the remaining tasks in the queue
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Copy the code
Finally, consider the alternative to execute() : submit.
Submit the task: submit()
Submit is an AbstractExecutorService method implemented by ThreadPoolExecutor.
// Runnable has no return value. Null is passed as the return parameter for creating FutureTask
publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// Runnable with parameter return values
// Finally construct a callable to execute, set the return value to result
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// Callable has its own return value
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Copy the code
The logic is pretty much the same: call the newTaskFor method to construct a Future object and return it. We see the newTaskFor method:
// Create a FutureTask to return
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
Copy the code
As you can see, this approach is simple: Construct and return a FutureTask, which is currently the only implementation class of the Future interface.
More specific content about Future will not be expanded, interested readers can go to know.
The last
That concludes the source code analysis for ThreadPoolExecutor. Finally, let’s review:
- The entire execution process of ThreadPoolExecutor starts with the execute method, which uses the appropriate execution scheme based on the specific situation
- Threads are encapsulated in worker objects, which, through runWorker methods, constantly call getTask methods to call poll or take methods of queues to get tasks
- When a worker needs to exit, the getTask method returns NULL to exit
- When the thread pool is closed, depending on the closing method, it waits for all threads to complete execution and then closes the thread pool.
The overall thread pool model is very similar to handler: a producer-consumer model. But unlike the Handler, ThreadPoolExecutor does not support delay task, which is implemented in the ScheduledThreadPoolExecutor; Handler uses the synchronize keyword for thread safety, while ThreadPoolExecutor uses Lock and other integral variables that use CAS for thread safety. Handler can’t reject a task, thread pools can; The Handler throws an exception that crashes the program directly, and the thread pool doesn’t wait.
With a better understanding of the thread pool’s internal source code, a more appropriate solution can be made based on the specific problem. ThreadPoolExecutor didn’t talk about some source code, as well as ScheduledThreadPoolExecutor, blocking queue of source code, interested readers can go to a deeper understanding, expand all about thread pool.
Full text here, the holiday liver article is not easy ah, if the article is helpful to you, ask a thumbGive it a thumbs up before you leave.
reference
- The Art of Concurrent Programming in Java: a must read for concurrent programming, the author is very thorough about some principles
- Java Core Technology Volume: This series of books focuses on the use of frameworks, not the fundamentals
- JavaGuide: javaGuide, a blog that summarizes Java knowledge well
- Concurrent Programming in Java: Use of thread pools: one of the best bloggers on blogosphere, writes articles that are easy to understand and in-depth
Full text here, the original is not easy, feel help can like collection comments forward. I have no talent, any ideas welcome to comment area exchange correction. If need to reprint please comment section or private communication.
And welcome to my blog: Portal