“This article has participated in the call for good writing activities, click to view: the back end, the big front end double track submission, 20,000 yuan prize pool waiting for you to challenge!”
ThreadPoolExecutor is a synonym for Java thread pool concurrency, and multithreaded development is basically based on this to do specific business development. Although I feel that I have answered, there have been a lot of articles on the Internet to write this, but it is written by myself, after all, it is more profound than others’ understanding, so RECENTLY I have been systematically sorting out Java knowledge. Then the next major analysis of the principle of the multi-threaded framework.
ThreadPoolExecutor constructors are introduced as member variables
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
Copy the code
The interview depends, at most, on the actions of the seven parameters in this constructor,
- CorePoolSize is the number of core threads that the thread pool will hold even if the thread is idle, unless allowCoreThreadTimeOut is set to true
- MaximumPoolSize Maximum number of threads in the thread pool
- KeepAliveTime unit indicates the keepAliveTime unit of a thread
- WorkQueue is a task queue that holds tasks that are Runnable by calling the execute method of the thread pool.
- ThreadFactory Factory for creating threads
- RejectedExecutionHandler is a rejection policy that needs to be implemented when the number of threads exceeds the limit and queues are full.
Members get zero
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; RunState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;Copy the code
One of the favorite interview questions is what does the CTL variable represent? CTL variables with high three said the state of the thread pool, with low 29 said the number of threads, both by | operation, joining together the CTL variables, namely the maximum number of threads thread pool capacity is 2 ^ (29) – 1.
Thread pool state
- RUNNING RUNNING status -1 << 29 indicates that the thread pool can receive new tasks and process queue tasks
- SHUTDOWN SHUTDOWN state -1 << 29 indicates that the thread pool does not accept new thread pool tasks but can process tasks in the queue
- STOP The STOP state 1 << 29 indicates that the thread pool does not accept new thread pool tasks, does not process tasks in the queue and interrupts tasks that are being executed in the thread pool
- TIDYING 2 << 29 indicates that all thread pools are terminated. The thread count is 0. The thread state changes to TIDYING and terminated hook function is executed
- TERMINATED 3 << 29 indicates that all TERMINATED methods are executed.
Install swap diagrams between thread states
Thread pool submission task execution process
First of all, the normal business code is to submit the task to the thread pool and execute the function through execute or submit method. The difference is that submit returns a Future, execute returns void, and then we will mainly analyze the execution process of execute. Submit involves asynchronous thread returns, which will be analyzed separately, so the following execute function can see the entire thread pool execution flow,
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); Else if (workerCountOf(recheck) == 0) AddWorker (null, false); workerCountOf(recheck) = 0; } else if (! addWorker(command, false)) reject(command); }Copy the code
The task execution flow of the thread pool is as follows:I believe the general process is clear to most students:
- The number of core threads is created when the number of Worker threads < corePoolSize
- When the number of Worker threads > corePoolSize, add the task to the task queue
- If corePoolSize< maxPoolsize, a new non-core thread will execute the task
- When the queue is full and the number of threads has reached maxPoolsize, the reject policy is executed
There are also places in the actual source code where small details are easily overlooked
- The process of rechecking the thread state and the number of threads in the thread pool
The process of adding worker threads to a thread pool
Add a work task to the thread pool main addWorker method. Since the code is long, I put comments in the code
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); Check if queue empty only if necessary. If (rs >= SHUTDOWN && The thread is at least not running, and is terminated. (rs == SHUTDOWN && firstTask == null && ! Workqueue.isempty ())) If the current thread pool is shutdown and the task queue is not empty and the first task is empty, return false if the current thread pool is shutdown and the first task is empty and the task queue is not empty. for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) ! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); If (rs < SHUTDOWN | | / / thread pool is running state (rs = = SHUTDOWN && firstTask = = null)) {/ / thread pool in the SHUTDOWN state and if the first task is empty (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // Add worker. add(w); int s = workers.size(); If (s > largestPoolSize) // Record the maximum number of threads largestPoolSize = s; workerAdded = true; } } finally { - mainLock.unlock(); - } - if (workerAdded) { - t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code
Adding a worker thread is the main step
- Check the running status of the thread pool and whether the queue is empty, adding threads. The main reason for adding this check is that the thread pool is multi-threaded. It is possible to call shutdown or other methods to close the thread pool. Therefore, before each step, we should check the thread pool status again. Only when the queue task is not empty can the work thread process the task.
- If the number of core threads in the thread pool is greater than the number of core threads, then increase the number of core threads by 1 via CAS, and then re-read’s CTL is in the same state as when it started the loop.
- Create the Worker object, its first argument Runable is the first task executed, and then obtain the reentrant lock of mainLock, and then check whether the thread pool is in the shutdown state again. Then add the Worker object to the Set Set of the Worker thread and judge that it is larger than largePoolSize. The size of the workSet is assigned to largePoolSize, and then the workerAdded is assigned to true. In finnally, when the workerAdded is true, the Worker’s start method is called to start the Worker thread.
If the WorkerAdded fails, remove the newly added Worker thread from the Set of Worder and reduce the number of threads in the thread pool by 1.
Execution process of Worker thread Worker
First, let’s take a look at the constructor of the member variable of the Work class. From the code of Work below, we can see that it implements the RUnnable interface. In the previous section, Worker starts by calling its start method, and its run method is really executed by the operating system.
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) {// The initialization state is -1, which means it cannot be interrupted setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }Copy the code
In the following code, the run of Work calls runWork directly, passing in its own object, and starts a loop to determine whether the first task is empty from the task queue, locks it, and then executes the task, and if the task queue is empty, processes the exit of Work.
/** Delegates main run loop to outer runWorker */ public void run() {// If... } final void runWorker(Worker w) {// Wokder currentThread wt = thread.currentthread (); Runnable task = w.firstTask; w.firstTask = null; // Assign state to 0, which interrupts the run.w.nlock (); // allow interrupts boolean completedAbruptly = true; Try {// loop to determine the first Task to get from the Task while (Task! = null || (task = getTask()) ! If (w.lock() = null) {// get the lock of the current Work. // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while Clearing Interrupt // The thread pool is at least in a stop state, and even if stop, Tidying. Terminated state if ((runStateAtLeast(ctl.get(), STOP) / / check if Thread interrupts and clear | | (Thread. Interrupted () && / / to check the status of a Thread pool again at least STOP runStateAtLeast (CTL) get (), STOP))) && / / determine whether interrupt again! Wt.isinterrupted ()) // Interrupt thread wt.interrupt(); Try {// Process business tasks before executing them (hook functions) beforeExecute(wt, task); Throwable thrown = null; Task.run (); task.run(); task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally {// After executing a business task (hook function) afterExecute(task, thrown); }} finally {// End of execution reset to null, return to the while loop to take down a task = null; W.com pletedTasks++; // release the lock and process the next task. }} // This is the end of the list of tasks that represent the business. This is the end of the list of tasks that represent the business. But they also throw completedAbruptly = false; ProcessWorkerExit (w, completedAbruptly); processWorkerExit(w, completedAbruptly); }}Copy the code
If null is returned, the runWorker loop exits, and processWorkExit is processed in finnaly, and the Work thread exits.
- If the thread pool status value is at least SHUTDOWN, and the thread pool status value is at least STOP, or the task queue is empty, then the thread pool workcout is reduced by 1 and null is returned.
- Count the number of thread pools in the thread pool. If the number of threads is greater than the maximum number of threads, or if the allowCoreThreadTimeOut parameter is true, or if the number of threads is greater than and the task queue is empty, subtract the thread pool by one and return NULL.
Private Runnable getTask() {// Timeout flag Boolean timedOut = false; // Did the last poll() time out? for (;;) Int c = ctl.get(); Int rs = runStateOf(c); If (rs >= SHUTDOWN, if (rs >= SHUTDOWN, if (rs >= SHUTDOWN, if (rs >= SHUTDOWN, if (rs >= STOP)) Or a task queue is empty && (rs > = STOP | | workQueue. IsEmpty ())) {/ / CAS to reduce the number of worker threads 1 decrementWorkerCount (); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // allowCoreThreadTimeOut is set to true, or if the number of threads in the thread pool is greater than corePoolSize, the Worker that needs to timeout needs to exit. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; / / the number of threads is greater than the maximum number of threads | | already timeout if ((wc > maximumPoolSize | | (timed && timedOut)) / / the number of threads is greater than 1 or task queue is empty && (wc > 1 | | WorkQueue. IsEmpty ())) {/ / CAS to reduce the number of threads 1 if (compareAndDecrementWorkerCount (c)) return null; continue; } try {// If you need to process the Worker with timeout, get the keepAliveTime time in the thread pool constructor. If you do not process the Worker with timeout, call take. Runnale r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r ! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code
The exit processing of Worker: 1 from the above analysis, we know that completedAbruptly is whether there is an abnormal sign during the task execution. If there is an error during the task execution, the number of threads in the thread pool will be reduced by 1 2. Add the global lock of mainLock of thread pool, which is mainly distinguished from the internal lock of Worker when Worker executes the task. After completing the task, add 1 to remove Worker from the set of workers. 3. Run tryTerminate to check whether the thread pool is closed. 4. Depending on whether the thread pool state is supplemented by non-core Worker threads
Private void processWorkerExit(Worker w, Boolean completedAbruptly) { Then subtract work if (completedAbruptly) // if abrupt, then workerCount wasn't adjusted decrementWorkerCount(); Final ReentrantLock mainLock = this.mainLock; The lock (); / / locking mainLock. Try {// Add 1 completedTaskCount += w.completedTasks; Worker. remove(w); } finally { mainLock.unlock(); } // Try thread pool close tryTerminate(); CTL int c = ctl.get(); SHUTDOWN RUNNING if (runStateLessThan(c, STOP)) {// If (! completedAbruptly) {//allowCoreThreadTimeOut parameter true, min=0, indicating that no thread resident is required. Int min = allowCoreThreadTimeOut? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; If (workerCountOf(c) >= min) return; AddWorker (null, false); // replacement not needed} // replacement not needed} }}Copy the code
The logic for tryTerminate is to handle a thread pool shutdown scenario
final void tryTerminate() { for (;;) { int c = ctl.get(); / / thread pool is RUNNING state if (set (c) | | / / thread pool state at least TIDYING runStateAtLeast (c, TIDYING) | | / / thread pool state is SHUTDOWN but the queue is not empty (runStateOf (c) = = SHUTDOWN &&! workQueue.isEmpty())) return; if (workerCountOf(c) ! = 0) {// Eligible to terminate // interruptIdleWorkers(ONLY_ONE); return; } // Only the last thread can get there, processing the thread pool from the TIDYIING state to the TERMINATED state final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// Hook function terminated(); } finally {// Set thread pool TERMINATED state ctl.set(ctlOf(TERMINATED, 0)); // Wake up the terminated thread calling awaitTermination. SignalAll (); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }Copy the code
RejectedExecutionHandler RejectedExecutionHandler
A processing strategy when the thread pool is unable to process a task: 1. The default refused to strategy is AbortPolicy directly thrown RejectedExecutionException exception 2. DiscardPolicy directly discarded task 3. DiscardOldestPolicy discarding the oldest task in the task queue, If the thread pool is still closed, try to submit the task to the thread pool for execution
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
Copy the code
- CallerRunsPolicy performs this task directly to the caller, the run function of the direct Runnable.
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (! e.isShutdown()) { r.run(); }}}Copy the code
In summary, this paper mainly analyzes the state transition of thread pool, the creation of Worker thread Worker, the process of executing tasks in the task queue, and the rejection strategy in detail.