The theoretical knowledge
Over the weekend, Shanghai began to rain and cool down, no matter to open the computer to see the source code, thought of the thread pool. Much has been written about thread pooling technology on the Web, and the theory is the same.
But theory is theory, and while you might be able to get by with one during an interview, you might be overwhelmed if you dig into the details. Therefore, I suggest that we need to explore any theory by ourselves. Only when we have practiced it, we can have our own understanding rather than memorizing it by rote, so that we can never forget it.
Thread pool is a common development of a pooling technology, the purpose of this kind of pooling technology is to improve the utilization of resources and improve efficiency, similar to HttpClient connection pool, database connection pool, etc..
When you don’t have a Thread pool, you create concurrency with multiple threads, usually by inheriting the Thread class or implementing the Runnable interface or implementing the Callable interface, and you know that Thread resources are valuable, and you have to remember context when you switch between threads, So creating too many threads to perform tasks can cause a waste of resources and have a significant impact on the CPU.
For convenience, JDK 1.5 provides several ways to create a thread pool:
- Executors. NewFixedThreadPool (nThreads) : create a fixed-length thread pool, can maximum concurrency control thread, beyond the thread will be waiting in the queue.
- Executors. NewCachedThreadPool () : create a cacheable thread pool, if a thread pool length more than processing needs, flexible recycle the idle thread, if no recovery, new threads.
- Executors. NewSingleThreadExecutor () : create a single threaded thread pool, it will only use the only worker thread to perform the task, to ensure all tasks in specified order (FIFO, LIFO, priority).
- Executors. NewScheduledThreadPool (nThreads) : create a fixed-length thread pool, and the regular support periodic task execution.
Although these are provided by default in the JDK, they are too poorly customized and a bit weak to meet our needs in many cases. For example, what’s wrong with a fixed thread pool created with newFixedThreadPool that internally uses a queue called LinkedBlockingQueue, but its queue size defaults to Integer.max_value?
When the core thread is full, tasks are queued and wait until the queue is full. However, the memory may be OOM before integer. MAX_VALUE is reached, because there is not enough space for so many tasks.
So, more often than not, we use custom thread pools using new ThreadPoolExecutor. In fact, if you look at the source code, you can see that the above four thread pools are created using ThreadPoolExecutor. They just fill in the fixed values of these parameters for us.
The constructor for ThreadPoolExecutor looks like this:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
Copy the code
Let’s take a look at the meaning and functions of these core parameters:
- CorePoolSize: The core thread base size of the thread pool.
- MaximumPoolSize: specifies the maximum thread size of the thread pool.
- KeepAliveTime and Unit are the lifetime of the thread after it is idle.
- WorkQueue: blocking queue for storing tasks.
- Handler: Saturation strategy when both the queue and the maximum thread pool are full.
Through the configuration of these parameters, the entire thread pool workflow is as follows:
A few years ago, the general general technical interview to understand the above knowledge content is almost enough, but the impact of the current environment or interview more advanced development of the knowledge points above is not able to withstand in-depth examination. For example, do you know: What is the internal state of the thread pool? How do I determine if the number of core threads is full? Does the maximum number of threads include core threads? Can the task be executed when the number of threads in the thread pool reaches maximumPoolSize? . The answer to these questions can only be found in the source code of the thread pool.
Combat simulation test
We create a custom thread pool, then create 10 tasks in a row through the for loop and print thread execution information. The overall code looks like this:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3.6.5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(4));
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
System.out.println("Test thread pool:" + Thread.currentThread().getName() + ","+ threadPoolExecutor.toString()); }); }}Copy the code
CorePoolSize = 3, maximumPoolSize = 6, and workQueue size = 4
You can see that a total of 6 threads were created to perform 10 tasks, which makes sense, c=3 core threads performed 3 tasks, then 4 tasks were queued for the core thread to perform, and finally, e=3 additional threads were created to perform the remaining 3 tasks, The total number of threads created is c + e = 6 <= 6 (maximum number of threads).
If we adjust the constructor argument when the object is created, for example
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3.5.5L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(2));
Copy the code
Again, we execute the code above, complains, throw the following RejectedExecutionException exception information, you can see because of rejection policies to intercept exception information.
If the number of core threads is c = 3 and the size of the blocking queue is 2, the core thread will process 5 of the tasks and create e=5 additional threads for the remaining 5 tasks, so the total number of threads is C + e= 8. But the maximum number of threads is maximumPoolSize = 5, so the maximum number of threads is exceeded, and the default reject policy throws an exception. As long as the total number of threads created >= maximumPoolSize, the thread pool will not continue to execute the task and will execute the policy rejection logic.
Technology comes from life
People often brainstorm unexpected solutions when they encounter some difficulties in life, which are the crystallization of thought and wisdom. Many of our technological solutions also come from life.
I often wonder what I should do if I don’t become a programmer in the future. Catering seems to be the most popular, after all, people eat.
Open a restaurant early certainly can’t do too big, one is the principal problem, there is the need to test the market water. In the case of uncertain market demand to rent a small store or reliable, even if the loss will not be too much.
The store rents a few dozen square meters and makes spicy grilled fish. The dining table is about 15 tables. Then there are the employees, in addition to cook is mainly a waiter, but I can’t recruit 15 waiter, there is too much waste in each table assigned a, need to improve resource utilization, control costs, so employees can’t move too much, I only need to hire five fixed attendant is responsible for greeting customers in the hall and the food is ok, everyone is responsible for the three tables.
But I did not expect that our restaurant’s grilled fish is very popular with the public, and with good marketing effect, it has become an online celebrity. Business is booming, every day full seats. But the space was limited, so we had to let the customers who did not have seats later wait for a little bit, so we arranged a waiting area for the number waiting line, customers wait for the orderly meal.
Restaurant staff at this time the same, is still the five main service work, the attendant is responsible for handling the hall waiting in line at the same time zone area is also cannot too big, there is a limit, can not affect our normal human activity, but also should not exceed the scope of the restaurant outside the restaurant, if the customer standing on the road, it is very dangerous. With the word of mouth fermentation, ten, ten, our customers stream in an endless stream, at the same time, we in order to improve the consumption rate and start a takeout service, can be packaged to take away.
In order to avoid such a dangerous situation and improve the order processing rate, we have to hire additional temporary workers to help handle our takeout orders and thus improve our business processing capacity.
But it’s not always the more the better. We have cost control, because we also need to pay for temporary workers. So what to do? Finally, we had to bear the pain. For orders that exceeded our processing capacity, we adopted certain rejection strategies, such as telling customers that the quota for the day was sold out, please come back another day.
The above is our thread pool to run a real-life example, five fixed core thread is our waiter, and the waiting area is our waiting queue, queue cannot be set to infinity, because can lead to OOM, if the queue is full thread pool will other additional threads to processing tasks, namely temporary workers in the example above, Restaurants have cost control so they have a maximum number of employees, you can’t hire too many temps, that’s the maximum number of threads. If the number of temporary workers reaches the maximum and the queue is full, we can only temporarily not accept additional service requests through a rejection policy.
Look at the source code
Mouth without evidence, theory is so said, that in fact the source code is really written like this? Let’s look at the source code of thread pool. Through the threadPoolExecutor. Execute (…). The entry into the source code, deleted the comment information after the source content as follows, because of good packaging, so only a few lines.
public void execute(Runnable command) {
// #1 task is non-null validation
if (command == null)
throw new NullPointerException();
// #2 Add a core thread to execute the task
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// The task is queued
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// Double check
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// #4 Add a normal thread to execute the task, and execute a reject policy if it fails
else if(! addWorker(command,false))
reject(command);
}
Copy the code
If you don’t pay attention to details and only focus on the whole, from the above source code we can find that there are four main steps to deal with logic. Excluding the non-null verification code in step 1, we can see that the remaining three steps are actually the running logic of our thread pool, which is the logical content of the running flowchart above.
- (1) Non-null check of tasks.
- (2) Obtain the number of RUNNING threads, if less than the number of core threads, create a core thread to execute the task, otherwise go #3.
- (3) If the current thread pool is in the RUNNING state, then the task is queued. We also do a double check, because there may be threads that have died since we last checked, or the pool has been closed since we entered the method, so we need to check state again. If the thread pool stops, you need to roll back the previous adding task to the queue and reject the task with a reject policy, or if there are no threads in the pool, you need to start a new thread to execute the task.
- (4) If the queue is full and the task cannot be added to the queue, a new thread will be created to execute the task. If the task fails, the thread pool may be closed or the thread pool is saturated, and the task will not be accepted by executing the reject policy.
There are two points to note in double check:
1. Why double check the thread pool state?
In a multi-threaded environment, the state of the thread pool changes all the time, and ctl.get() is a non-atomic operation. It is possible that the thread pool state will change as soon as it is retrieved. Deciding whether to add Command to the workque is the state before the thread pool. Without double Check, command is never executed in the event that the thread pool is in a non-running state (which is quite possible in a multi-threaded environment).
AddWorker (null, false) is null.
AddWorker (null, false), this method only creates a new thread, but no task is passed in, because the task was added to the queue earlier. This prevents the thread pool from being in the running state, but there are no threads to process the task.
According to the specific steps of the above code, we can draw a detailed execution process, as shown in the figure below
The above source code is actually only a few lines of 10, looks very simple, mainly it is better encapsulation, which has two main points need to focus on the explanation, respectively: the state of the thread pool and addWorker() add work method, these two points understand this section of thread pool source code almost also understand.
Thread pool running status -runState
Threads are state, and the thread pool is TERMINATED at state. These states provide primary lifecycle control and are maintained internally along with the RUNNING of the thread pool. The thread pool is TERMINATED at state: RUNNING, SHUTDOWN, STOP, TIDYING, and TERMINATED.
The meanings of each status value and the operations that can be performed under the status value are as follows:
Running state | State description |
---|---|
RUNNING | Receives new tasks and can also process tasks in a blocking queue. |
SHUTDOWN | No new tasks are received, but tasks in the blocking queue can continue to be processed. |
STOP | No new tasks are received, no queued tasks are processed, and ongoing tasks are interrupted. |
TIDYING | All tasks are terminated, workercount is 0, and threads moving to TIDYING state will run terminated() hook methods. |
TERMINATED | The terminated() method call becomes this state after it completes. |
The lifecycle state flow is shown below:
Most of the time, we express the status by a simple int value, such as the database data deletion flag delete_flag, where 0 indicates valid, 1 indicates deleted. In the thread pool source code we can see that it is represented in the following way,
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Copy the code
How is it possible to maintain two values internally with a single variable: runState and thread count? Convert a decimal int value to a binary value of 32 bits, with the higher three bits representing the runState and the lower 29 bits representing the number of worker threads.
The internal encapsulation to get the life cycle state, get the number of threads in the thread pool is calculated as follows:
// Get the thread pool state
private static int runStateOf(int c) { return c & ~CAPACITY; }
// Get the number of threads
private static int workerCountOf(int c) { return c & CAPACITY; }
// Packing and unpacking ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code
Clever bit operations can be used to obtain the health status value of 3 bits higher and the thread count value of 29 bits lower, but the implementation code will not be described here if you are interested.
AddWorker thread -addWorker
Adding threads is done through the addWorker() method, which takes two inputs, Runnable firstTask and Boolean Core.
private boolean addWorker(Runnable firstTask, boolean core){... }Copy the code
- The Runnable firstTask is the firstTask that the currently added thread needs to perform.
- Boolean core indicates whether the thread currently executing is a core thread or a normal thread.
In the code that returns the execute() method of the previous thread pool, you can see that addWorker() is called in three places: #2, #3, and #4.
- #2: Pass when the number of worker threads is less than the number of core threads
addWorker(command, true)
Add core threads to execute command tasks. - # 3: During double check, if it is found that the thread pool is running normally but there is no worker thread in it, an empty task and a common thread are added. In this way, the worker whose task is empty will block the task queue to get the task when the thread is executing, which is equivalent to creating a new thread. It just wasn’t assigned right away.
- #4: When the queue is full, add normal threads (non-core threads) to execute the current task, and reject the task if it fails.
We are done with the addWorker() method call, so let’s take a look at what it does, source code is as follows:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
This method is a bit long, so let’s break it up into two parts. Let’s look at the first part:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);// Get the state of the thread pool
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// Try adding workerCount in CAS mode
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// If the thread pool state changes, loop through the outermost layer again
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}Copy the code
In the first line is the retry: code, which for those of you who have not seen it, is a position marker followed by a retry loop that marks the position of the loop.
When we normally write a for loop, we do it with a continue; Or break; To jump out of the current loop, but if we have multiple nested for loops, if we want to jump out of all loops or to a specific position in the inner loop body when a condition is reached, use Retry: to mark the position.
There are four places in the code where the body of the loop changes to continue execution, two return false; , a break retry; And a continue retry; .
Let’s look at the first return false; The return is in the outermost for loop,
if(rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask ==null&&! workQueue.isEmpty()))return false;
Copy the code
This is a thread pool status and thread queue status of the code, this logic judgment is a bit of a loop can be changed
rs >= shutdown && (rs ! = shutdown || firstTask ! = null || workQueue.isEmpty())Copy the code
Return false to indicate failure to add worker thread.
- Rs > shutdown: The thread pool state is in
STOP
.TIDYING
.TERMINATED
Failed to add a worker thread and did not accept a new task. - rs >= shutdown && firstTask ! = null: The thread pool status is in
SHUTDOWN
.STOP
.TIDYING
.TERMINATED
State and the first task of the worker is not empty, the worker thread fails to be added and no new task is accepted. - Rs >= shutdown && workqueue. isEmppty: The thread pool is in the state
SHUTDOWN
.STOP
.TIDYING
.TERMINATED
State and the blocking queue is empty, the worker thread fails to be added and no new task is accepted.
In this case, the outermost for loop constantly checks the current thread pool state to see if it can accept new tasks, and if it passes, it can proceed.
And then the second return false; The second inner for loop returns the number of worker threads currently in the pool. If this condition is not met, the return is false, indicating that the worker thread has failed to be added.
- Whether the number of worker threads exceeds the CAPACITY.
- If core worker threads are added, whether the maximum core thread capacity (corePoolSize) is exceeded.
- If normal worker threads are added, the maximum thread pool size (maximumPoolSize) is exceeded.
Break retry; , indicates that if the attempt to increase the workerCount by CAS succeeds, the second part of the code will be continued retry. The thread pool state is checked again to see if it has changed. If so, the loop continues from the outermost layer for.
Parsing the first part of the code shows that only break retry is available; What does the second part of the code do?
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Create the Worker object instance
w = new Worker(firstTask);
// Get the thread in the Worker object
final Thread t = w.thread;
if(t ! =null) {
// Enable reentrant lock, exclusive
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// Get the thread pool running status
int rs = runStateOf(ctl.get());
// If rs < SHUTDOWN is used to check whether the thread pool is RUNNING, or
//rs == SHUTDOWN &&firstTask == null
// and the firstTask is empty,
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// Add Worker instances to the thread pool workers
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// Thread added successfully flag bit -> true
workerAdded = true; }}finally {
/ / releases the lock
mainLock.unlock();
}
// If the worker instance joins the thread pool successfully, the thread will be started, and the thread will be started successfully flag -> true
if (workerAdded) {
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
// Failed to add thread
addWorkerFailed(w);
}
return workerStarted;
Copy the code
The main purpose of this section of code is to start a thread, followed by a bunch of criteria to see if a worker thread can be started. It consists of two tries… catch… Finally content composition, you can break them up so that they are easy to understand.
Let’s look at the inner layer try… catch… Finally, when the Thread Thread in the Worker instance is not empty, an exclusive lock ReentrantLock mainLock is opened to prevent other threads from modifying the operation.
try {
// Get the thread pool running status
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
Copy the code
- Firstly, check the state of the thread pool. When the thread pool is in the RUNNING state or the thread pool is in the SHUTDOWN state but the firstTask of the current thread is empty, the worker instance can be added to the thread pool only when the above conditions are met, i.e
workers.add(w);
. - The largestPoolSize variable is used to record the maximum number of threads that have ever occurred.
- If the flag bit workerAdded is set to true, the worker thread is successfully added.
- It must be executed in finally whether it succeeds or not
mainLock.unlock()
To release the lock.
Outer layer try… catch… Finally is used to determine whether the worker thread started successfully if the inner try… catch… Finally code is successfully executed, that is, worker is successfully added into the thread pool, and the flag position of workerAdded is true. Then, t.start(), the thread in worker, is started. At the same time, the flag bit of workerStarted is set to true, indicating that the thread is successfully started.
if (workerAdded) {
t.start();
workerStarted = true;
}
Copy the code
If it fails (workerStarted == false), then the addWorkerFailed(w) method must be executed in finally. This method is equivalent to rolling back and forth.
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if(w ! =null)
// Remove the worker instance from the thread pool
workers.remove(w);
// With CAS, the number of worker threads workerCount is reduced by 1
decrementWorkerCount();
//
tryTerminate();
} finally{ mainLock.unlock(); }}Copy the code
The Worker class
Add (w), worker.add (w), t.start(), Thread t = w.thread, worker.add (w), t.start(), Thread t = w.thread
- What is the Worker here? What is the difference between Thread and Thread?
- How do threads get tasks after they start? Where did you get the assignment?
- When the blocking queue is full, does the extra thread created go to the queue to get the task? If not, where did you get it?
- Will the core thread always exist in the thread pool? Will ordinary threads created in addition be destroyed after completing their tasks?
Worker is an internal class of ThreadPoolExecutor, which is mainly used to maintain the interrupt control state of tasks executed by threads. It implements Runnable interface and inherits AQS. Implementing Runnable interface means that Worker is a thread. AQS are inherited to implement the exclusive lock function.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// initialize the state of AQS to -1
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); }}Copy the code
AQS is used instead of ReentrantLock to reflect the thread’s current execution state.
- Once the lock method acquires an exclusive lock, it indicates that the current thread is executing a task.
- If you are executing a task, you should not interrupt the thread.
- If the thread is not currently in an exclusive lock state, that is, idle, it is not working on a task and can be interrupted.
- The thread pool calls the interruptIdleWorkers method to interrupt idle threads when the shutdown or tryTerminate methods are executed, The interruptIdleWorkers method uses the tryLock method to determine whether threads in the thread pool are idle. If the thread is idle, it can be safely reclaimed.
The Worker class has a constructor that takes the given firstTask, firstTask, and holds a thread. Threads are threads created from ThreadFactory when the constructor is called and can be used to perform tasks.
FirstTask uses it to initialize the firstTask passed in, which can be null or null. If this value is non-empty, the thread will execute the task immediately at startup; If the value is null, a thread needs to be created to perform the tasks in the blocking queue, that is, the creation of non-core threads.
Task running -runWorker
Above we have seen t.start(), which is run in the Worker’s run() method
public void run(a) {
runWorker(this);
}
Copy the code
The run() method calls the runWorker() method, and all the implementations are there
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
A lot of people get a headache when they see code like this. In fact, if you look closely, we can see the key points in this code. catch… Finally code, let’s look at each of these separately and temporarily delete or comment out any exceptions thrown so it looks much cleaner
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// Since the state in AQS is set to -1 when Worker is initialized, an unlock is required to update the state to 0 to allow thread interruption
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// The loop determines whether the task (firstTask or task fetched from the queue) is empty
while(task ! =null|| (task = getTask()) ! =null) {
// Worker locks, essentially AQS obtains resources and tries to update CAS state from 0 to 1
w.lock();
// If the thread pool running state is stopping, make sure that the thread is interrupted;
// If not stopping, make sure that the thread is non-interrupted.
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();// Omit the second try... catch... finally
}
GetTask () returns null once, and the thread exits normally
completedAbruptly = false;
} finally {
// Handle thread exit
processWorkerExit(w, completedAbruptly);
}
Copy the code
The second try… catch… finally
try {
beforeExecute(wt, task);
Throwable thrown = null;
// The third try is omitted... catch... finally
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
Copy the code
The third try… catch… finally
try {
// Run the task
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
Copy the code
As you can see in the above code, there are beforeExecute, afterExecute, and Terminaerd functions. They are hook functions that can be overridden in subclasses to extend ThreadPoolExecutor, respectively. Such as adding logging, timing, monitoring, or statistics collection capabilities.
- BeforeExecute () : called before the thread executes
- AfterExecute () : called after the thread executes
- Terminaerd () : called when the thread pool exits
So after break up, found that actually pay attention to the two main points that were getTask () and task. The run (), a task. The run () is to run the task, that we continue to see getTask () is how to get the task.
Get the task -getTask
private Runnable getTask(a) {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//1. Thread pool state is STOP, TIDYING, TERMINATED
The thread pool is shutdown and the queue is empty.
If one of the above two conditions is met, the number of worker threads is subtracted by 1, and then null is returned
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
CorePoolSize allows the core worker object to be eliminated or the number of worker threads > corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Number of worker threads > maximum number of threads maximumPoolSize or timed == true && timedOut == true
//2. The number of worker threads is greater than 1 or the queue is empty
// CAS subtracts the number of threads by 1 and returns null. CAS fails to subtract 1 from the number of threads and enters the next loop for retry
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// If timed is true, the poll() method is used to pull timeout, and no valid task is waited within keepAliveTime, null is returned
// If timed is false, take() will block until the next valid task is available.
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
A keyword allowCoreThreadTimeOut inside, it’s the default value is false, you can through the start of the Java1.6 threadPoolExecutor. AllowCoreThreadTimeOut (true) to set to true, You can see from the literal what this field does, which is to allow core thread timeout destruction.
By default, the number of core threads remains constant, even if they are idle, and when set to true, the corePoolSize thread in the thread pool will be destroyed and closed if the thread idle time reaches keepAliveTime.
At the end
Through the whole piece of analysis down, there are a lot of details in the thread pool need to pay attention to, after reading the source code also understand more, solve a lot of confusion, to obtain more knowledge, so the source code reading is very important.