preface
Lecture 4 in the Thread pool series. It’s the most important lecture. Thread pool workflow. This article is also part of the source code analysis. Enter the source code with questions to learn about thread pools.
series
- Thread Pool Series – (1) Background
- Thread pool series – (2) Thread pool status
- Thread pool series – (3) Rejection policy
- Thread Pool series – (4) Workflow
- Thread pool series – (5) Shutdown && shutdownNow
- Thread pool – (6) Submit
The flow chart
Let’s start with a thread pool flowchart. Have an overall impression. Analysis in combination with source code.
Is the thread pool running?
We know that the most important and core piece of thread pool code is the execute method. See 👇 below.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
The number of threads is smaller than the number of core threads
/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
// Get a variable that holds the state and number of threads in the thread pool
int c = ctl.get();
// The number of threads is smaller than the core thread
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
// Add to queue
/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
// Make sure the thread pool is in execution
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// If not in the running state. Delete the tasks in the degree column
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
// Continue adding tasks
/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
else if(! addWorker(command,false))
reject(command);
}
Copy the code
Look from the source code. Isn’t the first step to determine if it is smaller than the number of core threads? Why judge the thread pool state?
Well, that’s one way to think about it. But a more accurate understanding; The thread pool is still running
We see inside addWorker how the thread pool state is set to unusable at the beginning. That’s shutdownNow or shutdown; As you can see from the breakpoint, when executing execute after shutdownNow or shutdown, the value of runStateOf is 1610612736
addWorker | 1610612736 |
---|---|
The state is TERMINATED by the first three bits. The state is TERMINATED and false is returned. With the following 👇 diagram, let’s order the logic. First of all, it’s executed
-
â‘ Less than the number of core threads.
-
â‘¡ It cannot be added.
-
â‘¢ Execution End
-
â‘£ Check whether the thread pool is running. It wasn’t. Don’t go here
-
⑤ Add again. Failed to add
-
â‘¥ Go to the thread pool rejection policy
So, the initial check is to see if the thread pool is running. If not, a rejection policy is implemented. That answers the top question. Why is the thread pool state determined first in the process
The author prepared a flowchart of the appeal. As follows, it is convenient for you to know the process clearly.
This problem is sorted out. Let’s take a look at the process at 👆 at the top and see if I can get a better understanding of the thread pool process
Add tasks
As we analyzed earlier. How does addWorker add a task when it is smaller than the number of core threads?
Take a look at his source code first, actually still many. The author divides it into two parts for analysis.
- The for loop
- try cache
Top half for
Let’s look at the for loop.
-
We have analyzed it in â‘ . Not triggered when the thread pool is running.
-
â‘¡ First see. There are two for(;;) in the screenshot. Cycle. Second loop (red box â‘¡)
- Check whether the number of threads exceeds the upper limit.
CAPACITY
The value of 👇 is as follows
- Or whether it exceeds the core thread or the maximum thread. Here we choose the number of core threads. As follows 👇 we pass true
- And then execute it
compareAndIncrementWorkerCount
Try updating the previously recorded number of threads.- True indicates that the CAS record succeeded and breaks out of the big loop, the outermost loop.
- False Indicates that the record is not successful. Continue executing the for loop until it returns true
- Check whether the number of threads exceeds the upper limit.
The top half actually updates the number of threads in the thread pool. Tasks are not actually added
The lower part of the try cache
Move on to the second half. Try cache part.
Here are five steps.
-
The first step (1). The task is wrapped in the Worker.
-
The first step (2). locked
-
The first step (3). Add tasks
-
The first step (4). unlock
-
The first step (5). Perform a task
In step 1. The task is wrapped in the Worker. Worker is the packaging of task and thread. As shown in the figure below, a new thread is created through the thread creation factory to combine the task and thread together.
Take a look at what Worker inherits from. Notice that there is a Runnable
Take a look at step 3, add a task
try {
// Check thread pool status
int rs = runStateOf(ctl.get());
// if the thread pool is SHUTDOWN and the task is null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// Check whether the thread is alive
if (t.isAlive()) // precheck that t is startable
// Throw an exception
throw new IllegalThreadStateException();
// Add tasks to workers (HashSet)workers.add(w); }}Copy the code
Let’s look at his two judgments
-
The thread pool is running
-
The thread pool is SHUTDOWN and firstTask == null
The first one is easier to understand. The key is the second condition. The author consulted the source code. AddWorker (null,true/flase) is found in many places. These methods should be judged.
Let’s see if the thread is alive.
First, isAlive() is true to indicate that the thread is executing. In other words. One of the preceding threads is in RUNNABLE TIMED_WAITING BLOCKED WAITING. Either way. Both indicate that the thread is now working.
Thread pool. The task you just created has already been executed. We haven’t even started yet. So we have a precheck that t is startable
Perform a task
This is a little bit easier to understand. Add all workers to workers (HashSet)
That’s not the point. Focus on 👇t.start()
.
We all know that a thread’s start() must have a place where run is executed. The thread pool hides this in the Worker. Take a look at the Worker. Runnable is implemented. And implements the run method. When the thread starts (). I’m going to run up here, and then I’m going to run worker (this);
At this point we know that the place where the task is being executed is at runWorker. The core code here is just one line task.run(); Execute the task.
final void runWorker(Worker w) {
// Get the current thread
Thread wt = Thread.currentThread();
// Get the current task
Runnable task = w.firstTask;
// Empty worker.firstTask and release the lock
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// If task or getTask is not empty, the loop continues
while(task ! =null|| (task = getTask()) ! =null) {
/ / lock
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// return ctl.get() >= stop
// If thread pool status >=STOP or (thread is interrupted and thread pool status >=STOP) and the current thread is not interrupted
// Make sure there are two things:
// 1. The thread pool is not stopped
// 2. Ensure that the thread is not interrupted
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted())// Interrupt the current thread
wt.interrupt();
try {
/ / short method
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 👇 executes the run method (Runable object)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
// When the task is finished, empty it, complete the task ++, and release the lock
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Exit work
processWorkerExit(w, completedAbruptly);
}
Copy the code
See while (task! = null || (task = getTask()) ! = null) this code. There are two conditions
- The task is not empty
- GetTask () gets the task
Again, focus on the getTask() method. It’s just getting the task from the queue.
private Runnable getTask(a) {
// Flag whether a timeout occurs
boolean timedOut = false; // Did the last poll() time out?
/ / death cycle
for (;;) {
// Get the thread pool state
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// Check whether the thread pool state is greater than or equal to SHUTDOWN
// Check whether the thread pool state is greater than or equal to STOP or whether the task queue is empty
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
/ / degressive workerCount
decrementWorkerCount();
// Return null, indicating that there are no more tasks
return null;
}
/ / get workerCount
int wc = workerCountOf(c);
// Determine whether to use timeout to obtain the task
AllowCoreThreadTimeOut: this can be set with Thread#allowCoreThreadTimeOut
// 2. If the workerCount is greater than coolPoolSize, the timeout obtaining task is performed
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// If workerCount is greater than maximumPoolSize or the fetch task times out
// And workerCount > 1 or the task queue is empty
//
// If the workerCount is greater than maximumPoolSize, it is impossible to determine whether the timeout occurs
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// Determine whether the workerCount is successfully decrement
// Return null on success to indicate no more tasks, failure to continue the loop
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// Determine whether to use timeout to get the task from the task queue
// the poll method returns null if the task is not fetched at the specified time
The // take method blocks until the task is fetched
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// If the obtained task is not null, return the task
if(r ! =null)
return r;
// Mark timeout
timedOut = true;
} catch (InterruptedException retry) {
// Reset the timeout flag if an exception occurs
timedOut = false; }}}Copy the code