An introduction to
Thread pool (English: Thread pool) : a thread usage pattern. Too many lines will bring scheduling overhead, which will affect cache locality and overall performance. A thread pool maintains multiple threads, waiting for the supervisor to assign tasks that can be executed concurrently. This avoids the cost of creating and destroying threads while working on short-duration tasks. Thread pools not only ensure full utilization of the kernel, but also prevent overscheduling. The number of threads available should depend on the number of concurrent processors available, processor cores, memory, network sockets, and so on. For example, the number of threads is usually the number of cpus +2, because too many threads cause additional thread switching overhead.
Thread pools in Java are implemented using the ThreadPoolExecutor class. This article on the source of this class to analyze this class internal for the creation of threads, management and scheduling of background tasks and other aspects of the execution principle.
Let’s take a look at the thread pool class diagram:
The main purpose of the image above is to give you an idea of the relationship between the thread pool related classes, at least to make them look familiar, so that you won’t be afraid to see them later.
Executor Framework interface
The Executor framework is a framework for invoking, scheduling, executing, and controlling asynchronous tasks based on a set of execution policies. The purpose is to provide a mechanism for separating task submission from how the task works.
Below is a diagram of the ThreadPoolExeCutor class. Executors is a tool class that provides static methods that return different thread instances according to the user’s choice.
As you can see from the figure above, ThreadPoolExeCutor is the core of the thread pool.
There are three Executor interfaces in J.U.C:
- Executor: A simple interface to run new tasks;
- ExecutorService: Extends the Executor interface. Added some methods to manage the executor lifecycle and task lifecycle;
- ScheduledExecutorService: extends the ExecutorService. Support for Future and periodic execution of tasks.
In fact, some design ideas can be seen through these interfaces, and the name of each interface matches its task exactly. Just because there’s only one method in Executor, you don’t put it in another interface. This is also an important single principle.
ThreadPoolExeCutor analysis
Before going into the details of the ThreadPoolExeCutor execution logic, look at the following flowchart:
This diagram is a summary of the entire operation of ThreadPoolExeCutor. The core logic of the entire source code can be summed up as follows:
- Create a thread: know how to create a thread, control the number of threads, thread survival and destruction;
- Adding tasks: How to handle tasks after they are added, whether to execute them immediately or save them first;
- Executing a task: How to obtain a task and how to handle a task failure?
Let’s dive into the source code analysis to get an insight into the design of ThreadPoolExeCutor.
The constructor
Let’s start with constructors:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // Note that workQueue, threadFactory, and handler are not null. Empty will throw an error if directly (workQueue = = null | | threadFactory = = null | | handler = = null) throw new NullPointerException (); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Copy the code
- CorePoolSize Number of core threads:Represents the size of the core thread pool. When a task is submitted, if the number of threads in the current core thread pool does not reach corePoolSize, a new thread is created to execute the submitted task, even if there are idle threads in the current core thread pool. If the number of threads in the current core thread pool has reached corePoolSize, the thread will not be recreated. If it’s called
prestartCoreThread()
orprestartAllCoreThreads()
All core threads are created and started when the thread pool is created. If corePoolSize == 0, the thread of the thread pool is destroyed after the task is completed and no request is coming in. If corePoolSize > 0, the core thread will not be destroyed even after the local task completes. CorePoolSize can be interpreted as the number of free threads that can be retained. - MaximumPoolSize: Indicates the maximum number of threads that can be concurrently executed in the thread pool. If the blocking queue is full and the number of threads in the current thread pool does not exceed maximumPoolSize, a new thread is created to execute the task. Note maximumPoolSize >= 1 must be greater than or equal to 1. MaximumPoolSize == corePoolSize, which is a fixed size thread pool. Actually the maximum CAPACITY is controlled by CAPACITY.
- KeepAliveTime: indicates the idle time of a thread. When the idle time reaches the keepAliveTime value, threads are destroyed until only corePoolSize threads remain, avoiding wasting memory and handle resources. By default, keepAliveTime takes effect only when the number of threads in the thread pool > corePoolSize. However, when the allowCoreThreadTimeOut variable of ThreadPoolExecutor is set to true, the core thread is reclaimed when it times out.
- Unit: time unit. Specifies the time unit for keepAliveTime.
- WorkQueue Indicates the cache queue. When the number of threads requested > corePoolSize, the thread enters the BlockingQueue BlockingQueue. You can use ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue.
- ThreadFactory An engineering class that creates threads. You can specify a thread factory to give each created thread a more meaningful name, making it easier to find the cause of concurrency problems if they occur.
- Handler The object for which the rejection policy is executed. When the blocking queue of the thread pool is full and the specified threads are started, the current thread pool is saturated, and a strategy is needed to handle this situation. The strategies used are as follows:
-
- AbortPolicy: reject the submitted directly to the task, and throw RejectedExecutionException exception;
- CallerRunsPolicy: Executes the task only in the caller’s thread;
- DiscardPolicy: Discards the task without processing it.
- DiscardOldestPolicy: Discards the task with the longest duration in the blocked queue and executes the current task
Attribute definitions
After we look at the constructor, we can take a look at the variables in the class to help us understand the logic of how to run the code. Here are some important variables:
// It is used to mark the state of the thread pool (high 3 bits) and the number of threads (low 29 bits). Number of threads is 0. Private Final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); Private static final int COUNT_BITS = integer.size -3; private static final int COUNT_BITS = integer.size -3; / / thread number 29 (low) 00011111111111111111111111111111 largest private static final ints CAPACITY = (1 < < COUNT_BITS) - 1; / / (3) : 11100000000000000000000000000000 private static final ints RUNNING = 1 < < COUNT_BITS; / / (3) : 00000000000000000000000000000000 private static final ints SHUTDOWN = 0 < < COUNT_BITS; / / (3) : 00100000000000000000000000000000 private static final ints STOP = 1 < < COUNT_BITS; / / (3) : 01000000000000000000000000000000 private static final ints TIDYING = 2 < < COUNT_BITS; / / (3) : 01100000000000000000000000000000 private static final ints TERMINATED = 3 < < COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; } private static int workerCountOf(int c) {return c & CAPACITY; } / / CTL new value calculation, the thread state and the number of threads private static int ctlOf (int the rs, int wc) {return rs | wc. }Copy the code
There are some operations that need to be explained here.
- Integer.SIZE: The number of bits varies depending on the platform. Currently, 32 bits are common.
- COUNT_BITS (1 << COUNT_BITS) -1: the first COUNT_BITS bit is 1 and the rest bits are 0. The -1 operation changes the COUNT_BITS bits to 1.
- 1 < < COUNT_BITS: The original code of -1 is 10000000 00000000 00000000 00000001, the inverse code is 111111111 11111111 11111111 11111110, the complement +1, And then 29 bits to the left is 11100000, 00000000, 00000000, 00000000, 00000000; This is negative in decimal.
- ~CAPACITY: inversely, the highest three digits are 1.
Summary: The clever use of bit to link the number of threads to the running state reduces the presence of variables and memory footprint. The five states are TERMINATED in decimal order: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
Thread pool state
Thread pool status meaning:
- RUNNING: Accepts new tasks and processes tasks in the blocking queue;
- SHUTDOWN: rejects new tasks but processes tasks in the blocking queue.
- STOP: rejects new tasks and abandons the tasks in the blocking queue, which interrupts the ongoing tasks.
- TIDYING: All tasks are finished (including tasks in the blocking queue). The current active thread in the thread pool is 0 and the terminated method is called
- TERMINATED: indicates the TERMINATED state. State of terminated method call completed;
Thread pool state transitions:
- RUNNING -> SHUTDOWN: Explicitly call SHUTDOWN (), or implicitly call Finalize (), which calls SHUTDOWN ().
- RUNNING or SHUTDOWN)-> STOP: explicit shutdownNow() method;
- SHUTDOWN -> TIDYING: when the thread pool and task queue are empty;
- STOP -> TIDYING: when the thread pool is empty;
- TIDYING -> TERMINATED: when the TERMINATED () hook method is TERMINATED;
Source code, inverse code, complement
1. Source code: Source code is the symbol bit plus the absolute value of the truth value, that is, the first symbol, the rest of the value. For example, if it is 8-bit binary:
[+1] = 0000 0001
[-1] Original = 1000 0001
The first digit of a negative source code is a sign bit.
2. Inverse code: the expression method of inverse code is that the inverse of a positive number is itself, and the inverse of a negative number is on the basis of its original code, and the sign bit remains unchanged, and the other bits are reversed.
[+1] = [0000 0001] = [0000 0001
[-1] = [1000 0001] = [1111 1110
3. Complement: The expression method of complement is that the complement of a positive number is itself, and the complement of a negative number is based on its original code, with the symbol bits unchanged and the rest bits reversed, and finally +1. (that is, +1 on the basis of the inverse code)
[+1] = [0000 0001] Original = [0000 0001] inverse = [0000 0001] complement
[-1] = [1000 0001] Original = [1111 1110] inverse = [1111 1111] complement
4. Conclusion In the case of knowing the original code of a number: positive number: inverse code, complement code is itself negative number: inverse code is the high symbol bits remain unchanged, the rest bits are inverse. Complement: inverse +1
5. Left shift: When the value moves left or right, the value is first converted into its complement form, and then converted into the corresponding source code
Left shift: discard the high position and fill the low position
[+ 1] = [00000001]
[0000 0000] << 1 = [0000 0000]
[-1] = [1000 0001] Original = [1111 1111] complement
[1111 1111] add << 1 = [1111 1110] add = [1000 0010] original = [-2]
Where, again, the complement of a negative number is the inverse +1; The inverse of a negative number is the complement -1;
6. Right shift: The high position remains unchanged and the low position is discarded
[+127] = [0111 1111] = [0111 1111
[0111 1111] supplement >> 1 = [0011 1111] Supplement = [0011 1111] Original = [+63]
[-127] = [1111 1111] = [1000 0001] complement
1 = [1100 0000] 1 = [1100 0000] 1 = [1100 0000] 1 = [1100 0000]
Execute method analysis
After creating a thread pool with ThreadPoolExecutor, the task is submitted and executed. Execute method:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; C = ctl.get(); c = ctl.get(); } // Only the thread pool is RUNNING, If (isRunning(c) &&workqueue.offer (command)) {double-check int recheck = ctl.get(); // If the thread pool is not in the RUNNING state, the newly queued task will be removed. isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) addWorker(null, false); } // The core pool and queue are full, try to create a new thread else if (! AddWorker (command, false)) // If addWorker returns false, reject(command) is invoked; }Copy the code
The execution logic of the execute method is as follows:
- If fewer threads are currently running than corePoolSize, a new thread is created to perform the new task;
- If the number of threads running is equal to or greater than corePoolSize, the submitted task is placed in the blocking queue workQueue.
- If the current workQueue is full, a new thread is created to execute the task.
- If the number of threads exceeds maximumPoolSize, the saturation policy RejectedExecutionHandler will be used.
Note that addWorker(null, false) creates a thread but does not pass in a task because the task has already been added to the workQueue, so the worker gets the task directly from the workQueue when it executes. Therefore, addWorker(null, false) is executed when workerCountOf(recheck) == 0 to ensure that there must be a thread to execute the task in the RUNNING state of the pool.
It is important to note that the thread pool is designed to use the core thread pool corePoolSize, the blocking queue workQueue, and the thread pool maximumPoolSize to process tasks. In fact, this design concept is used in the requirements framework.
Note the difference between threads, which are stored in a workQueue, and tasks, which are fetched from a thread pool and controlled by CAPACITY.
AddWorker method analysis
The main job of addWorker is to create a new thread in the thread pool and execute it. The firstTask parameter specifies the firstTask to be executed by the new thread. The core parameter is true to determine whether the number of active threads is less than corePoolSize. False indicates whether the number of active threads is less than maximumPoolSize before adding a new thread.
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); Int rs = runStateOf(c); /* * if rs >= SHUTDOWN, no new task will be received; * 1. Rs == SHUTDOWN, which indicates that the state is SHUTDOWN, but can continue to process the existing tasks in the blocking queue * 2. FirsTask empty * 3. The blocking queue is not empty * * First consider rs == SHUTDOWN * in which case no new submitted task will be accepted, so return false if firstTask is not empty; * Then, if firstTask is empty and workQueue is also empty, return false, * because there are no more tasks in the queue, Check if queue empty only if necessary. If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) Int wc = workerCountOf(c); // If wc exceeds CAPACITY, which is the maximum of the lower 29 bits of the CTL (binary is 29 1s), return false; // Core is the second argument to the addWorker method, which is true to compare against corePoolSize or false to compare against maximumPoolSize. // if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; / / try to increase workerCount, if successful, the jump out of the first for loop if (compareAndIncrementWorkerCount (c)) break retry. C = ctl.get(); c = ctl.get(); // re-read CTL // If the current running state is not rs, the state has been changed. Return the first for loop and continue with if (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; W = new Worker(firstTask); w = new Worker(firstTask); // Each Worker object creates a Thread. Final Thread t = w.htread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN; // If rs is RUNNING or rs is SHUTDOWN and firstTask is null, add threads to the thread pool. // Since no new tasks are being added during SHUTDOWN, But will carry out the task of workQueue if (rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {if (t.i sAlive ()) / / precheck that t is startable throw new IllegalThreadStateException(); // workers is a HashSet worker. add(w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {// start the thread. workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code
The following points need to be noted:
- Recheck the state of the thread pool after the lock is acquired because other threads may have changed the state of the thread pool before the lock is acquired by this method, for example by calling the shutdown method. If the task is added successfully, the task is started.
-
t.start()
The run method in the Worker class is called, and the Worker itself implements the Runnable interface. The reason is that the Worker instance is passed into T when the thread is created. See the constructor of Worker class. - wc >= CAPACITY || wc >= (core ? corePoolSize : MaximumPoolSize)) each time addWorker is called to add a thread, it checks whether the current number of threads exceeds CAPACITY and then whether the thread is hypercorePoolsize or maximumPoolSize. Note The number of threads is controlled by CAPACITY.
Internal class Worker analysis
In the above analysis process, a Worker class was mentioned, which may be a little unclear to some students who are not very familiar with the source code. Let’s take a look at the source code of Worker:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; Thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() ! = 0; } protected Boolean tryAcquire(int unused) {// If set to 1, return false, If (compareAndSetState(0, 1)) {setExclusiveOwnerThread(thread.currentThread ()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); Void interruptIfStarted() {Thread t; If (getState() >= 0 && (t = thread)! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Copy the code
First there is the Worker inherited AbstractQueuedSynchronizer AQS, and implements the Runnable interface, shows the Worker threads itself is. If we look at the constructor, we can see that there are two attribute variables, namely Runnable and Thread instance. This class is essentially a wrapper around the attribute passed in and adds lock acquisition logic (inherits from AQS). For details, please refer to the article: Through ReentrantLock analysis of the implementation principle of AQS
Worker inherits AQS and uses AQS to realize the function of exclusive lock. Why not use ReentrantLock? You can see the tryAcquire method, which does not allow reentrant, whereas ReentrantLock does:
- Once the lock method obtains an exclusive lock, it indicates that the current thread is executing a task.
- If a task is being executed, the thread should not be interrupted;
- If the thread is not in the exclusive lock state, that is, the idle state, it is not processing the task, then you can interrupt the thread.
- The thread pool calls the interruptIdleWorkers method to interrupt idle threads when the shutdown or tryTerminate methods are executed, The interruptIdleWorkers method uses the tryLock method to determine whether threads in the thread pool are idle.
- It is set to non-reentrant because we do not want the task to regain the lock when it calls a thread pool control method like setCorePoolSize. If you use ReentrantLock, it is reentrant, so that if a thread pool-controlled method such as setCorePoolSize is called in the task, the running thread will be interrupted because the size is small and some threads will need to be interrupted.
Therefore, Worker inherits from AQS and is used to determine whether threads are free and can be interrupted.
In addition, setState(-1) is executed in the constructor; , set the state variable to -1. Why? Since the default state in AQS is 0, if a Worker object has just been created and the task has not yet been executed, it should not be interrupted.
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
Copy the code
Because of this, the Worker object’s unlock method is first called in the runWorker method to set state to 0. The tryAcquire method is determined by whether state is 0, so setState(-1); State is set to -1 to disallow thread interruption prior to task execution.
RunWorker method analysis
As mentioned earlier, the run method of the inner class Worker calls the external class runWorker. Let’s look at the specific logic of runWork.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // Set status to 0 to allow interrupts and avoid locking failures Boolean completedAbruptly = true; try { while (task ! = null || (task = getTask()) ! = null) {// Lock w.lock(); // If the thread pool is currently at least stop, set the interrupt flag; // If the current state of the thread pool is RUNNININ, reset the interrupt flag. If the current state of the thread pool is RUNNININ, reset the interrupt flag. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// Do something before executing a task beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) {thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {afterExecute(task, thrown); } } finally { task = null; W.com pletedTasks++; w.unlock(); } } completedAbruptly = false; ProcessWorkerExit (w, completedAbruptly); }}Copy the code
To summarize the execution of the runWorker method:
- The while loop keeps fetching tasks from the blocking queue via the getTask() method;
- If the thread pool is stopping, make sure the current thread is interrupted, otherwise make sure the current thread is not interrupted.
- call
task.run()
Perform tasks; - If task is null, it breaks out of the loop and executes the processWorkerExit method.
- When the runWorker method completes execution, it also means that the run method in the Worker completes execution and destroys the thread.
The beforeExecute and afterExecute methods here are left empty in the ThreadPoolExecutor class and left to subclasses to implement.
The completedAbruptly variable to indicate whether an exception occurred during the execution of the task, the value of which is evaluated in the processWorkerExit method.
GetTask method analysis
The getTask method fetches the task from the blocking queue as follows:
Private Runnable getTask() {// the value of timeOut indicates whether the task was timedOut the last time it was fetched from the blocking queue. Boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /* * Check if queue empty only if necessary. * 1. Rs >= STOP, whether the thread pool is stopping; * 2. Whether the blocking queue is empty. * If the above conditions are met, the workerCount is reduced by 1 and null is returned. * Because no more tasks are allowed to be added to the blocking queue if the current thread pool state is SHUTDOWN or above. */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // The timed variable is used to determine whether timeout control is required. // allowCoreThreadTimeOut is false by default, meaning that core threads are not allowed to timeout; // wc > corePoolSize, indicating that the number of threads in the current thread pool is greater than the number of core threads; / / for these threads over the core number of threads, the need for overtime control Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; /* * wc > maximumPoolSize because setMaximumPoolSize may be executed at the same time; * Timed && timedOut if true, the current operation needs to be timed out and the last time the task was fetched from the blocking queue timed out * If the number of valid threads is greater than 1 or the blocking queue is empty, try to decrement workerCount by 1; * If the reduction fails, retry is returned. * If wc == 1, the current thread is the only thread in the pool. */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try {/* * Timed: if timed is true, the poll method of the blocking queue is used for timeout control. If no task is obtained within keepAliveTime, null is returned. If the queue is empty at this point, the take method blocks until the queue is not empty. * */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; // If r == null, timeout has occurred. TimedOut = true; } catch (InterruptedException retry) {// If the current thread is interrupted while retrieving the task, set timedOut to false and return timedOut = false; }}}Copy the code
Now that you’ve reached this point, you’ll notice that there are several important checks inside ThreadPoolExcute:
- Judge the current running state, according to the running state to do processing, if the current stop running, then a lot of operations are not necessary;
- Determine the current number of thread pools and compare this to corePoolSize and maximumPoolSize before deciding what to do next.
First, the first if determines if the RUNNING state is not RUNNING and if rs >= STOP (whether the thread pool is stopping) or the blocking queue is empty. The workerCount is reduced by 1 and null is returned. The thread is no longer needed to fetch the running task. The thread will also be removed later at processWorkerExit.
The second if condition aims to control the number of effective threads in the thread pool. If the number of threads in the current thread pool exceeds corePoolSize and is smaller than maximumPoolSize, and the workQueue is full, you can add worker threads. If the number of threads in the current thread pool exceeds corePoolSize and is smaller than maximumPoolSize, you can add worker threads. If timedOut is true, the workQueue is empty, which means that there are no more threads in the current thread pool to execute the task. Keep the number of threads at corePoolSize.
When will it be destroyed? When the runWorker method is finished executing, the JVM automatically retrieves it.
When the getTask method returns NULL, the while loop is broken out of the runWorker method, followed by the processWorkerExit method.
ProcessWorkerExit method
Let’s look at the logic of the processWorkerExit method:
Private void processWorkerExit(Worker w, Boolean completedAbruptly) {// If completedAbruptly is true, an exception occurred during thread execution, You need to subtract workerCount by 1; // If the thread executes without exception, the workerCount is already decrement by 1 in the getTask() method and no further decrement is required. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Count the number of completed tasks completedTaskCount += w.completedTasks; Worker. remove(w); worker. remove(w); worker. remove(w); } finally { mainLock.unlock(); } // Decide whether to terminate the thread pool based on the thread pool state. TryTerminate (); int c = ctl.get(); /* * when the thread pool is RUNNING or SHUTDOWN, if the worker is terminated abnormally, addWorker will be added directly; * Keep at least one worker if allowCoreThreadTimeOut=true and wait queue has tasks; * If allowCoreThreadTimeOut=false, workerCount is not less than corePoolSize. */ if (runStateLessThan(c, STOP)) { if (! completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}Copy the code
At this point, the worker thread is destroyed after processWorkerExit has finished executing, and this is the lifetime of the worker thread. But there are two caveats:
- Think about when you’re going to call this method, when the task is done. When nothing is done, you need to see if it is necessary to terminate the thread pool, and tryTerminate is called.
- If the thread is below STOP, the number of core threads is determined to be the required number, and if not, another thread is created.
TryTerminate method
The tryTerminate method determines whether to terminate the thread pool based on the state of the pool, as follows:
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * If the current thread pool status is one of the following: * 1. * 2. TIDYING or TERMINATED because there are no threads running in the thread pool. * 3. Run the task in the workQueue after SHUTDOWN and the wait queue is not empty. */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // If the number of threads is not zero, interrupt an idle worker thread and return if (workerCountOf(c)! = 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// Set the state to TIDYING. // The terminated method terminated if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// The terminated method defaults to doing nothing, leaving it to subclasses to implement terminated(); } finally {// set state to TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }Copy the code
InterruptIdleWorkers (Boolean onlyOne) If ONLY_ONE = true interrupts at most one idle thread, and if ONLY_ONE = false interrupts all idle threads. When does a thread become idle?
One is the number of threads, the task is completed; The second is that the thread executing workqueue.take () in the getTask method will block until the interrupt is executed.
So call tryTerminate at the end of the worker thread each time to try to interrupt an idle worker thread, avoiding the situation where the fetch task remains blocked while the queue is empty.
Shutdown method
The shutdown method switches the thread pool to the shutdown state, calls interruptIdleWorkers to request the interruption of all idle workers, and finally calls tryTerminate to try to terminate the pool.
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// checkShutdownAccess(); // Switch state to SHUTDOWN advanceRunState(SHUTDOWN); // interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // Try to terminate the thread pool tryTerminate(); }Copy the code
Consider the following question: in the runWorker method, the Worker object W is locked during the execution of the task. Why lock each Worker thread during the execution of the task?
Here’s a closer look:
- In the getTask method, if the thread pool is SHUTDOWN and the workQueue is empty, null is returned to terminate the worker thread, and the SHUTDOWN method is called to bring the pool into the SHUTDOWN state.
- The shutdown method calls interruptIdleWorkers to interrupt idle threads. InterruptIdleWorkers hold mainLock and iterate over workers to determine whether worker threads are idle one by one. But there is no mainLock in the getTask method;
- In getTask, is called if the current thread pool state is judged to be RUNNING and the blocking queue is empty
workQueue.take()
To block; - If the current thread pool state is RUNNING and the shutdown method is used to change the state to shutdown, then the current worker thread is calling if there is no interrupt
workQueue.take()
The workQueue is blocked and not destroyed because no new tasks are allowed to be added to the workQueue during SHUTDOWN, so the thread pool can never be closed. - It can be seen from the above that there are race conditions between shutdown method and getTask method (when the task is obtained from the queue).
- Solving this problem requires thread interrupts, which is why the interruptIdleWorkers method is used. In the call
workQueue.take()
If the thread is interrupted before or during execution, InterruptedException is thrown. - But to interrupt the worker thread, you also need to determine whether the worker thread is free. If the worker thread is working on a task, the interrupt should not occur.
- InterruptIdleWorkers use tryLock to determine if the Worker thread is processing the task. If tryLock returns true, Note The worker thread can be interrupted only when no task is being executed.
Let’s examine the interruptIdleWorkers method.
InterruptIdleWorkers method
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
Copy the code
InterruptIdleWorkers Iterates through all worker threads in the workers and interrupts the thread if tryLock fails.
Why do I need to hold mainLock? Since workers are of type HashSet, thread-safety is not guaranteed.
ShutdownNow method
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // interruptWorkers() interrupts all worker threads, whether idle or not; Tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }Copy the code
The shutdownNow method is similar to the shutdown method except that:
- Set the state to STOP.
- Interrupt all worker threads, whether idle or not;
- Retrieves unexecuted tasks from the blocking queue and returns.
After the shutdownNow method is executed, the tryTerminate method, which was analyzed earlier, is called to state the thread pool at TERMINATED.
Thread pool monitoring
Monitored by the parameters provided by the thread pool. There are several properties in the thread pool that you can use when monitoring the thread pool
- GetTaskCount: Total number of executed and unexecuted tasks in the thread pool;
- GetCompletedTaskCount: The number of completed tasks in the thread pool. The value is less than or equal to taskCount.
- GetLargestPoolSize: Maximum number of threads ever created by the thread pool. This data tells you if the thread pool is full, i.e., maximumPoolSize;
- GetPoolSize: specifies the current number of threads in the thread pool.
- GetActiveCount: The number of threads executing tasks in the current thread pool.
The ThreadPoolExecutor class provides several empty methods, such as beforeExecute, afterExecute, and terminated, to monitor the thread pool. You can extend these methods to add new operations before or after execution, such as counting the execution time of a task in a thread pool, which can be extended from ThreadPoolExecutor.
That’s it for ThreadPoolExecutor.