Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”
This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.
Background on thread pools
Thread pools are a popular concurrency framework for almost any program that needs to process tasks asynchronously and concurrently. The benefits of using thread pools are as follows:
- Reduced resource consumption: Reuse of created thread pools, reducing the cost of creation and destruction;
- Improved response speed: Tasks can be executed immediately when they arrive without waiting for threads to be created;
- Improved thread manageability: Threads can be allocated, tuned, and monitored uniformly by thread pools.
Difficulties and highlights of thread pools
Let’s take a look at how thread pools are the core technical architecture for reclaiming and maintaining running threads.
The premise and introduction of thread pools
In general, JDK thread pools are ThreadPoolExecutor, and most people will have a general understanding of the process for executing tasks in a thread pool. In fact, the process is fairly straightforward and I will not go into details. I have also covered technical points and explanations in previous articles.
How do thread pools recycle threads?
runWorker(Worker w)
Copy the code
Basic processes executed by threads
-
Once the Worker thread is started, the runWorker(Worker w) method is entered.
-
Inside is a while loop, which determines whether the task is empty. If not, the task is executed.
-
If the task is not available or an exception occurs, exit the loop and execute processWorkerExit(w, completedAbruptly); Remove the worker thread in this method.
How to read a task
There are two main methods: firstTask, which is a task that the worker thread executes on its first run, but can only execute once, and then fetch the task from the getTask method.
GetTask is key, and in the case of exceptions not being considered, the return, which exits the loop, terminates the thread.
private Runnable getTask(a) {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
The focus is on the getTask return operation
There are two cases where it comes back
- In the first case, the thread pool state is already STOP, TIDYING, TERMINATED, or SHUTDOWN and the task is TERMINATED as empty.
if(runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
Copy the code
- In the second case, the number of worker threads has exceeded the maximum number or the current worker thread has timed out, and there are other worker threads or the task queue is empty.
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
Copy the code
Thread pools reclaim worker threads
Scenario where all tasks are completed in the RUNNING state without shutdown.
In this scenario, the number of worker threads is reduced to the number of core threads (if there are no more, there is no need to recycle).
Case scenario Analysis
-
For example, a thread pool has 4 core threads and 8 maximum threads.
-
You start with four worker threads, and when the task fills up, you have to increase the number of worker threads to eight.
-
When the thread is out of work, it will revert to a state of four worker threads (depending on the value of allowCoreThreadTimeOut, which is discussed here as the default value false, i.e. the core thread does not time out). If true, worker threads can be destroyed altogether.
-
The thread pool state is already STOP, TIDYING, TERMINATED, or SHUTDOWN and the task is TERMINATED.
-
Since the thread pool is always RUNNING, this judgment is always false. In this scenario, condition 1 can be ignored.
Let’s look at how threads run when they can’t fetch a task.
- There are two ways to get a task from the task queue. Timeout waiting can still block forever. The timed variable is the determining factor. Timed is true if the current number of threads is greater than the number of core threads, otherwise false(if allowCoreThreadTimeOut is false, of course).
Ok, so now we’re talking about timed is true. KeepAliveTime is generally not set, the default value is 0, so it can basically be considered as not blocking, immediately return the result of fetching the task, after the thread timeOut waiting to wake up, find that the task can not be fetched, timeOut changes to true, enter the next loop.
-
At 1, the thread pool is RUNNING and does not enter the code block.
-
At this time, the task queue is empty, the condition is true, CAS reduces the number of threads, if successful, return, otherwise, repeat 1.
Note that it is possible for multiple threads to pass the judgment of 2 at the same time. Would that reduce the number of post-threads and reduce the expected number of core threads?
-
For example, the current number of threads has only 5, at this time two threads wake up at the same time, through the judgment of 2, and reduce the number, then the number of threads left is only 3, which is inconsistent with the expectation.
-
In fact is not, in order to prevent this kind of situation, compareAndDecrementWorkerCount (c) using CAS method, if the CAS fails the continue, into the next round of cycle, to judgment.
-
If the timed thread is false, the workqueue.take will not be destroyed and the workqueue.take will be blocked. If the timed thread fails, the workqueue.take will fail.
-
It can also be seen from this that although there is a number of core threads, threads do not distinguish between core and non-core. The threads created first are not core, and those created after the number of core threads is non-core. Which threads are retained in the end is completely random.
shutdown
-
The scenario where shutdown is called and all tasks are completed
-
In this scenario, all worker threads, whether core or non-core, are destroyed.
-
After shutdown is called, an interrupt signal is sent to all idle worker threads.
public void shutdown(a) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
Copy the code
Finally, pass false and call the following method.
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if(! t.isInterrupted() && w.tryLock()) {try {
t.interrupt();
} catch (SecurityException ignore) {
} finally{ w.unlock(); }}if (onlyOne)
break; }}finally{ mainLock.unlock(); }}Copy the code
-
As you can see, before the interrupt signal is issued, it determines whether the interrupt has occurred and obtains the exclusive lock of the worker thread.
-
When an interrupt is signaled, the worker thread is either in the getTask, preparing to acquire a task, or is executing a task, and will not be signaled until it finishes executing the current task, because the worker thread also locks the task while it is executing.
-
The worker thread completes the task and runs into the getTask again.
-
So let’s just look at how getTask handles interrupt exceptions.
The worker thread in getTask has two possibilities.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throwex; }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
-
The task is complete and the thread is blocking and waiting.
-
An interrupt signal wakes it up, allowing it to enter the next cycle.
-
It reaches 1, satisfies the condition, reduces the number of worker threads, and returns, terminating the thread by the outer layer.
-
So the decrementWorkerCount here is spin, so it’s definitely going to decrease by 1.
The mission has not yet been fully carried out
After shutdown is called, the thread pool can be terminated until the unfinished task is completed. So it’s possible that the thread is still working.
The discussion is divided into two stages
Phase 1 has more tasks, and all worker threads can get them
Analyze how the thread behaves after receiving an interrupt signal.
-
Let’s say I have thread A, which is getting the task through getTask. When A is interrupted, either poll or take will throw an interrupt exception when the task is acquired.
-
The exception is caught, the next loop is reentered, and the task can continue as long as the queue is not empty.
WorkQueue is the BlockingQueue type. For example, the common LinkedBlockingQueue and ArrayBlockingQueue, lockInterruptibly is called in response to interrupts.
This method in turn calls the AQS acquireInterruptibly(int ARg).
AcquireInterruptibly (int ARg) Thread. Interrupted is used either when the parkAndCheckInterrupt method blocks and is woken up by the interrupt.
This method returns the interrupted state of the thread and resets the interrupted state! That is, the thread is no longer interrupted, so that no error will be reported when the task is fetched again.
Therefore, this is just a waste of a loop for the thread that is preparing to fetch the task, which is probably a side effect of the thread interruption, but of course does not affect the overall operation.
The mission is just about finished
At this point, the task is nearing completion. For example, if there are four worker threads and only two tasks are left, two threads may acquire the task and two threads may block.
Because there was no lock before the task was acquired, would it happen that all the threads passed the check and went to the workQueue to get the task, just as the task queue was empty and all the threads were blocked? Because shutdown is complete, the thread can no longer be interrupted, so it is blocked and cannot be reclaimed.
Suppose there are four worker threads, A,B,C, and D, which pass the judgment of condition 1 and condition 2 at the same time and arrive at the place to fetch the task. Then, there is at least one more task in the work queue, and at least one thread can fetch the task.
Suppose A and B get the task, and C and D block.
A, B The next steps are:
-
After the task is completed, getTask again, condition 1 is met, return, and the thread is ready to be reclaimed.
-
ProcessWorkerExit (Worker w, Boolean completedAbruptly) reclaims the thread.
Is recycling as simple as killing threads? Take a look at the method for processWorkerExit(Worker W, Boolean completedAbruptly).
You can see that in addition to the worker. remove(w) remove line, tryTerminate is called.
If none of the first criteria is met, skip. Second condition, the worker thread still exists, then a random idle thread is interrupted.
The problem is that interrupting an idle thread does not necessarily interrupt the blocking thread. If A and B exit at the same time, is it possible that A interrupts B, B interrupts A, and AB interrupt each other so that there are no threads to interrupt and wake up the blocking thread?
Assuming that A can get to this point, A has been removed from the Worker thread set workers (processWorkerExit(Worker w, Boolean completedAbruptly) has been removed before tryTerminate()). Then A interrupts B, and B interrupts here, so A will not be found in workers.
In other words, the exiting threads can’t interrupt each other, so if I quit the set and interrupt you, you can’t interrupt me, because I’ve already quit the set, you can only interrupt others. So, even if N threads exit at the same time, at least at the end, there will be one thread that will interrupt the remaining blocking threads.
== After any one of the blocked C and D is awakened by interruption, the action of step1 will be repeated again and again until all the blocked threads are interrupted and awakened.
This is why, in tryTerminate, false is passed to interrupt only any idle thread.
conclusion
ThreadPoolExecutor reclaims the worker thread, and a thread getTask returns and is reclaims.
Two scenarios.
-
Scenario where all tasks are completed in the RUNNING state without shutdown
-
If the number of threads is greater than corePoolSize, the thread is blocked due to timeout. After the timeout wakes up, the CAS reduces the number of working threads. If the CAS succeeds, the system returns and the thread is reclaimed.
-
Otherwise enter the next loop. When the number of worker threads is less than or equal to corePoolSize, it can always block.
-
-
The scenario where shutdown is called and all tasks are completed
- When shutdown signals an interrupt to all threads, there are two possibilities.
All threads are blocking
Interrupt wake up, go into loop, all meet the first if condition, all return, all thread reclaim.
The mission has not yet been fully carried out
At least one thread will be reclaimed. The processWorkerExit(Worker W, Boolean completedAbruptly) method calls tryTerminate, signaling an interrupt to any idle thread. All blocked threads will eventually be woken up one by one and recycled.