Official account: Java Xiaokaxiu, website: Javaxks.com

Author: kingsleylam, link: cnblogs.com/kingsleylam/p/11241625.html

ThreadPoolExecutor (ThreadPoolExecutor) thread pool (ThreadPoolExecutor) thread pool (ThreadPoolExecutor)

However, I’m interested in how thread pools recycle worker threads, so let’s do a quick analysis to understand thread pools better.

The following uses JDK1.8 as an example

1. runWorker(Worker w)

Once the Worker thread is started, the runWorker(Worker w) method is entered.

Inside is a while loop, which determines whether the task is empty. If not, it executes the task. If the task is not available or an exception occurs, exit the loop and execute processWorkerExit(w, completedAbruptly); Remove the worker thread in this method.

The getTask() method is used to fetch tasks from the getTask() method. The firstTask is the task that the worker thread executes on its first run. GetTask () seems to be the key. In the case of exceptions, returning null means exiting the loop and terminating the thread. The next step is to see when getTask() returns NULL.

(The space is limited, and the steps of executing the task are omitted.)

GetTask () returns null

There are two cases in which null is returned, as shown in the red box.

In the first case, the thread pool state is already STOP, TIDYING, TERMINATED, or SHUTDOWN and the task is TERMINATED as empty.

In the second case, the number of worker threads has exceeded the maximum number or the current worker thread has timed out, and there are other worker threads or the task queue is empty. This is a little hard to understand, but remember it, and we’ll use it later.

Condition 1 and Condition 2 refer to the judgment conditions of the two cases respectively.

3. Analyze the thread pool to reclaim worker threads in different scenarios

3.1 Scenario where shutdown() is not called and all tasks are completed in RUNNING state

In this scenario, the number of worker threads is reduced to the number of core threads (if there are no more, there is no need to recycle).

For example, a thread pool has 4 core threads and 8 maximum threads. You start with 4 worker threads, and when the task fills up, you have to increase the number of worker threads to 8. When the thread is out of work, it will revert to a state of four worker threads (depending on the value of allowCoreThreadTimeOut, which is discussed here as the default value false, i.e. the core thread does not time out). If true, worker threads can be destroyed altogether.

The thread pool state is already STOP, TIDYING, TERMINATED, or SHUTDOWN and the task is TERMINATED. Since the thread pool is always RUNNING, this judgment is always false. In this scenario, condition 1 can be ignored.

Let’s look at how threads run when they can’t fetch a task.

Step1. There are two ways to fetch a task from the task queue. Timeout waiting can still block forever. The timed variable is the determining factor. Timed is true if the current number of threads is greater than the number of core threads, otherwise false (as stated above, only allowCoreThreadTimeOut is discussed here if false). Obviously, we’re talking about the timed is true case. KeepAliveTime is generally not set. The default value is 0, so it is basically considered not blocking and returns the result of the fetch task immediately.

After the thread timed out and waited to wake up, it was found that the task could not be fetched, timeOut changed to true, and the next loop was entered.

Step2. Come to the judgment of condition 1, the thread pool keeps RUNNING and does not enter the code block.

Step3. Come to the judgment of condition 2, then the task queue is empty and the condition is true. CAS reduces the number of threads.

Note here that it is possible for multiple threads to pass condition 2 at the same time, does that reduce the number of post-threads and reduce the number of core threads as expected?

For example, the current number of threads has only 5, and two threads wake up at the same time. By judgment of condition 2 and reducing the number at the same time, the remaining number of threads is only 3, which is inconsistent with the expectation.

Actually, no. In order to prevent this kind of situation, compareAndDecrementWorkerCount (c) using CAS method, if the CAS fails the continue, into the next round of cycle, to judgment.

If the timed thread is false, the failed thread is not destroyed and can be permanently blocked (workqueue.take ()).

It took me a long time to figure this out, thinking about how to ensure that the number of core threads can be recycled without locking. It was CAS.

It can also be seen from this that although there is a number of core threads, threads do not distinguish between core and non-core. The threads created first are not core, and those created after the number of core threads is non-core. Which threads are retained in the end is completely random.

3.2 Scenario where shutdown() is called and all tasks are completed

In this scenario, all worker threads, whether core or non-core, are destroyed.

After the shutdown() call, an interrupt signal is sent to all idle worker threads.

Finally, pass false and call the following method.

As you can see, before the interrupt signal is issued, it determines whether the interrupt has occurred and obtains the exclusive lock of the worker thread.

When the interrupt signal is emitted, the worker thread is either in getTask() preparing to acquire a task, or is executing a task, and it will not be emitted until it has finished executing the current task, because the worker thread also locks the task while it is executing. The worker thread completes the task and goes back to getTask().

So we just have to look at how getTask() handles interrupt exceptions.

3.2.1 The task has been completed and the thread is blocking and waiting.

Very simply, interrupt the signal to wake it up and start the next cycle. Condition 1 is reached, the condition is met, the number of worker threads is reduced, and null is returned, terminating the thread from the outer layer.

DecrementWorkerCount () here is a spin, so it’s definitely going to decrease by 1.

3.2.2 The task has not been completely executed

After shutdown() is called, the pool is not finished until the unfinished tasks are completed. So it’s possible that the thread is still working.

Again, there are two stages

Phase 1 has more tasks, and all worker threads can get them

We are not talking about thread exit yet, so we can skip it and just analyze how the thread behaves after receiving an interrupt signal.

Let’s say we have thread A, which is getting the task from getTask(). When A is interrupted, either poll() or take() throws an interrupt exception when the task is fetched. The exception is caught, the next loop is reentered, and the task can continue as long as the queue is not empty.

Poll () or workqueue.take () if thread A is interrupted and calls workqueue.poll () or workqueue.take (), it will not raise an exception. Can I still retrieve tasks normally?

This depends on the implementation of workQueue. WorkQueue is the BlockingQueue type. For example, the common LinkedBlockingQueue and ArrayBlockingQueue, lockInterruptibly() is called in response to interrupts. This method in turn calls the AQS acquireInterruptibly(int ARg).

AcquireInterruptibly (int arg), thread.interrupted () is used either on the entry to determine the interrupt exception or when the parkAndCheckInterrupt() method blocks and is woken up by the interrupt to determine the interrupt exception. This method returns the interrupted state of the thread and resets the interrupted state! That is, the thread is no longer interrupted, so that no error will be reported when the task is fetched again.

Therefore, this is just a waste of a loop for the thread that is preparing to fetch the task, which is probably a side effect of the thread interruption, but of course does not affect the overall operation.

At this point, BlockingQueue resets the interrupt status. How on earth is this a brilliant design? Doug Lea Orz.

Phase 2 is just about finished

At this point, the task is nearing completion. For example, if there are four worker threads and only two tasks are left, two threads may acquire the task and two threads may block.

Because there was no lock before the task was acquired, would it happen that all the threads passed the check and went to the workQueue to get the task, just as the task queue was empty and all the threads were blocked? Because shutdown() is done, it can no longer interrupt the thread, so the thread remains blocked and cannot be reclaimed.

That’s not going to happen.

Suppose there are four worker threads, A,B,C, and D, which pass the judgment of condition 1 and condition 2 at the same time and arrive at the place to fetch the task. Then, there is at least one more task in the work queue, and at least one thread can fetch the task. Recommended: Java interview exercises treasure book

Suppose A and B get the task, and C and D block.

A, B The next steps are:

Step1. After the task is completed, getTask() again, condition 1 is met, null is returned, and the thread is ready to be reclaimed.

Step2. processWorkerExit(Worker w, Boolean completedAbruptly) reclaim the thread.

Is recycling as simple as killing threads? Take a look at the method for processWorkerExit(Worker W, Boolean completedAbruptly).

You can see that tryTerminate() is called in addition to worker.remove (w) to remove the line.

If none of the first criteria is met, skip. Second condition, the worker thread still exists, then a random idle thread is interrupted.

The problem is that interrupting an idle thread does not necessarily interrupt the blocking thread. If A and B exit at the same time, is it possible that A interrupts B, B interrupts A, and AB interrupt each other so that there are no threads to interrupt and wake up the blocking thread?

The answer, again, is overthinking…

Assuming that A can get to this point, A has been removed from the Worker thread set workers (processWorkerExit(Worker w, Boolean completedAbruptly) has been removed before tryTerminate()). Then A interrupts B, and B interrupts here, so A will not be found in workers.

In other words, the exiting threads can’t interrupt each other, so if I quit the set and interrupt you, you can’t interrupt me, because I’ve already quit the set, you can only interrupt others. So, even if N threads exit at the same time, at least at the end, there will be one thread that will interrupt the remaining blocking threads.

Like dominoes, the break signal is spread.

After any of the blocking threads C and D are interrupted and woken up, the action of Step1 will be repeated again and again until all the blocking threads are interrupted and woken up.

This is why, in tryTerminate(), false is passed and you only need to interrupt any idle thread.

Thinking of this, I feel great respect for Doug Lea (Cantonese) again. It’s so beautifully designed.

4. To summarize

ThreadPoolExecutor collects worker threads, and a thread getTask() returns null, which is recycled.

There are two scenarios.

1. In the RUNNING state, all tasks are completed without shutdown()

If the number of threads is greater than corePoolSize, the thread is blocked due to timeout. After the timeout wakes up, the CAS reduces the number of working threads. If the CAS succeeds, null is returned and the thread is reclaimed. Otherwise enter the next loop. When the number of worker threads is less than or equal to corePoolSize, it can always block.

2. Call shutdown(), and all tasks are completed

When shutdown() signals an interrupt to all threads, there are two possibilities.

2.1) All threads are blocking

Interrupt wake up, enter the loop, both meet the first if judgment condition, both return null, all threads are recycled.

2.2) The task is not fully executed

At least one thread will be reclaimed. The processWorkerExit(Worker W, Boolean completedAbruptly) method calls tryTerminate(), signaling an interrupt to any idle thread. All blocked threads will eventually be woken up one by one and recycled.