An overview of the
This article is a collection of notes I took while reading the source code, annotating the key points of the source code in some detail, and then adding some of my own understanding of the thread pool mechanism. The ultimate goal is to clarify the following questions:
- Thread pools have execute() and submit() methods. What are the execution mechanisms?
- How do I create a new thread?
- How is the task performed?
- How are threads destroyed? How is the timeout mechanism implemented?
Let’s start with two important members of the thread pool:
ctl
AtomicInteger type. The higher three bits store the thread pool state, and the lower 29 bits store the current thread count. WorkerCountOf (c) returns the current number of threads. RunStateOf (c) returns the current thread pool state. Thread pools have the following states:
- RUNNING: Receives new tasks and processes queue tasks.
- SHUTDOWN: Does not receive new tasks, but processes queue tasks.
- STOP: does not receive new tasks, does not process queued tasks, and interrupts all tasks in process.
- TIDYING: All tasks are terminated, valid thread is 0. The terminated() method is triggered.
- TERMINATED: When the TERMINATED () method is TERMINATED.
Worker
This thread wraps the class in the thread pool. A Worker represents a thread. The thread pool manages these threads with a HashSet.
It should be noted that Worker itself does not distinguish between core threads and non-core threads. Core threads are just a term in the conceptual model. Features of Worker are realized by judging the number of threads as follows:
- Inherited from AQS, it itself implements one of the simplest unfair non-reentrant locks.
- The constructor passes in Runnable, which represents the first task to execute and can be null. Constructor creates a new thread.
- Implements the Runnable interface, passing this when creating a new thread. Therefore, when the thread starts, it executes the Worker’s own run method.
- The run method calls ThreadPoolExecutor’s runWorker method and is responsible for actually performing the task.
The execution mechanism of the submit() method
Submit returns a Future object whose GET method we can call to get the results of the task execution. The code is as simple as wrapping Runnable as FutureTask. As you can see, the Execute method is finally called:
public Future<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
Copy the code
The FutureTask code will not be posted, but a brief description of the principle:
- FutureTask implements the RunnableFuture interface, which inherits from Runnable. When a task is executed, FutureTask’s run method is called, where the actual task code is executed and the result is set.
- If the task is complete, the GET method returns the result directly; if not, the GET method blocks and waits for the result.
- When the result is set in the set method, the block is unblocked and the get method returns the result.
The execution mechanism of the execute() method
This mechanism should be familiar to all of you, so let me recap it:
- If the number of working threads is smaller than that of core threads, a new core thread is created to perform tasks.
- When the number of core threads is larger than the number of core threads, the task is added to the wait queue.
- When the queue is full, create a non-core thread to execute the task.
- If the number of worker threads exceeds the maximum number, the task is rejected
The specific code analysis is as follows:
int c = ctl.get();
if(workerCountOf(c) < corePoolSize) {// Less than the number of core threadsif (addWorker(command.true) // Start the core thread and execute the taskreturn; c = ctl.get(); // Retrieve value on failure}if (isRunning(c) && workQueue.offer(commandInt recheck = ctl.get(); int recheck = ctl.get();if (! isRunning(recheck) && remove(command) // Recheck to prevent status changes. If yes, remove the queue and reject the task (command);
else if(workerCountOf(recheck) == 0) // If the number of threads is 0, create a non-core thread. If the first parameter is null, run addWorker(null,false);
}
else if(! addWorker(command.falseThe queue is full. Create a non-core thread to execute task REJECT (command); // Execution failure indicates that the number of threads reaches the upper limit and the task is rejectedCopy the code
How are new tasks added to the queue?
The thread pool uses the addWorker method to create a new thread. The first parameter represents the task to be executed. The thread will complete the task and then fetch the task from the queue for execution. The second parameter is the symbol of the core thread, which is not the attribute of the Worker itself. It is only used to judge whether the number of Worker threads exceeds the standard.
This method can be divided into two parts, the first part does some pre-judgment and uses the loop CAS structure to increment the number of threads by one. The code is as follows:
Private Boolean addWorker(Runnable firstTask, Boolean core) {retry: // This syntax is not usedforLoop naming. Convenient nestedforLoop,break 和 continueSpecifies whether the loop is outer or innerfor(;;) { int c = ctl.get(); int rs = runStateOf(c); // if firstTask is not empty, this method is used to add tasks. If firstTask is empty, this method is used to create a new thread. The SHUTDOWN state does not accept new tasks, but processes tasks in the queue. That's the logic of the second judgment.if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; // Use cyclic CAS spin to increase the number of threads until it succeedsfor(;;) { int wc = workerCountOf(c); // Determine whether the thread capacity is exceededif (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false; // Use CAS to increment the number of threads by 1if (compareAndIncrementWorkerCount(c))
breakretry; C = ctl.get(); c = ctl.get(); // Re-read ctlif(runStateOf(c) ! = rs)continueretry; // The state has not changed, try again to increase the number of threads}}... . }Copy the code
The second part is responsible for creating and starting a thread and adding the Worker to the Hashset. The code is simple, nothing to comment on, and ReentrantLock is used to ensure thread safety.
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if(s > largestPoolSize) largestPoolSize = s; WorkerAdded = // This parameter is for testing purposestrue;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if(! workerStarted) addWorkerFailed(w); // Remove Worker and reduce the number of threads by 1}return workerStarted;
}
Copy the code
How is the task performed?
In the addWorker method, the thread is started. When a new thread is created, the Worker will pass itself in, so when the thread starts, it will execute the Worker’s run method, which calls ThreadPoolExecutor’s runWorker method to execute the task. The runWorker will loop the task execution logic as follows:
- If firstTask is not empty, run firstTask first and then empty it.
- If firstTask is empty, getTask() is called to fetch the task from the queue.
- Execute until there are no more tasks, then exit the while loop
- The processWorkerExit() method is called to remove the Worker from the HashSet, at which point the thread completes execution and is no longer referenced, and is automatically destroyed.
The specific code analysis is as follows:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true; //task is the task we pass to execute. If task is empty, fetch task from queue and execute try {while(task ! = null || (task = getTask()) ! = null) { w.lock(); // This logic is very convoluted. It actually implements the following logic: //1. If the thread pool has stopped and the thread is not interrupted, the condition is true and the thread //2 is interrupted. If the thread pool is not stopped, the thread is interrupted. Reset the thread state and retry the judgment of 1 //3. If the Thread pool does not stop, the Thread is not interrupted and the condition is not true. //Thread.interrupted() resets the interrupted status, guaranteedif((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); BeforeExecute (wt, task); //beforeExecute and afterExecute are empty methods, give subclasses try {beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) {thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly =false; } finally {// The thread is finished. This method removes the thread from the HashSet. A thread that terminates without a reference is automatically reclaimed. processWorkerExit(w, completedAbruptly); }}Copy the code
How are threads destroyed? How is the timeout mechanism implemented?
The getTask method returning null in the runWorker method causes the thread to complete execution and be removed from the HashSet, thus being destroyed by the system. The thread timeout mechanism is also implemented in this method, with the poll and take methods of BlockingQueue.
- The poll method can set a timeout period during which the queue will block and wait when the queue is empty, after which null will be returned
- The take method throws an exception when the queue is empty
The implementation principle of timeout mechanism is as follows:
- When allowCoreThreadTimeOut is true, all threads will timeout, all call the poll method, and pass in the keepAliveTime argument.
- When allowCoreThreadTimeOut is false, if the number of worker threads is greater than the number of core threads, this thread is treated as a non-core thread and the poll method is called
- When allowCoreThreadTimeOut is false and the number of worker threads is less than or equal to the number of core threads, the thread is treated as the core thread, the take method is called, and an exception is thrown when the queue is empty, and the next loop is entered. If the queue is always empty, the core thread will always be in this loop waiting for the task to process.
The specific code is as follows:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
returnnull; } int wc = workerCountOf(c); / / allow the core thread timeout or the number of threads is greater than the core thread Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; // the timed && timedOut parameters are combined to control the timeout mechanismif ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue; } try {// the poll method blocks when the queue is empty and returns a null value when keepAliveTime is exceeded. The take method returns an exception directly. / / when allowCoreThreadTimeOuttrueWhen allowCoreThreadTimeOut is allowCoreThreadTimeOut, core threads and non-core threads call pollfalseWhen the number of threads exceeds the number of core threads, the timeout mechanism will enter. If not, the current thread will be treated as the core thread, call take, throw an exception and enter the next loop. If the queue is empty, the loop will continue. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();if(r ! = null)return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code