Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

I have explained the specific use method and parameter resolution of thread pool before. If you are not clear about the basic usage and concept of thread pool, you can first look at my previous article on thread pool. Here is a flow chart of thread pool operation to help you to simply understand the working principle of thread pool.

Thread pool source code we mainly through ThreadPoolExecutor analysis, step by step to analyze the core content of thread pool source code.

01

Attribute resolution

// High 3 bits: represents the lowest level of the current thread pool running state minus the high 3 bits: Private final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); Private static final int COUNT_BITS = integer.size - 3; private static final int COUNT_BITS = integer.size - 3; Private static final int CAPACITY = (1 << COUNT_BITS) - 1; // Indicates that new tasks can be accepted and tasks in the queue can be executed. private static final int RUNNING = -1 << COUNT_BITS; // Indicates that no new tasks are accepted, but tasks in the queue can be executed. private static final int SHUTDOWN = 0 << COUNT_BITS; // Indicates that new tasks are not accepted, tasks in the queue are not executed, and ongoing tasks are interrupted. private static final int STOP = 1 << COUNT_BITS; // All tasks terminated and the number of worker threads is 0. The last thread terminated at this state will execute the terminated() hook method. Only one thread will execute the method. private static final int TIDYING = 2 << COUNT_BITS; // State terminated. The terminated() hook method is terminated; private static final int TERMINATED = 3 << COUNT_BITS; WorkQueue private final BlockingQueue<Runnable> workQueue; // Thread pool global lock. MainLock is required when adding and reducing workers and when changing thread pool running status. private final ReentrantLock mainLock = new ReentrantLock(); // The thread pool where the worker->thread is actually stored. private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); Private int largestPoolSize; // Record the maximum number of threads in a thread pool. Private Long completedTaskCount; Private volatile ThreadFactory ThreadFactory; /** * Private volatile RejectedExecutionHandler handler; If allowCoreThreadTimeOut == false, the number of threads in the core thread count will be maintained. //allowCoreThreadTimeOut == true The number of threads in the core that are idle will also be reclaimed. private volatile long keepAliveTime; // Controls whether threads within the core thread count can be reclaimed. True does, false does not. private volatile boolean allowCoreThreadTimeOut; Private volatile int corePoolSize; Private volatile int maximumPoolSize;Copy the code

Attributes that describe thread pool state are at the heart of thread pool source code, as illustrated here in a diagram.

  1. Running state: When the thread pool is running, tasks can be received or run.

  2. Shutdown state: In this state, the thread pool will not receive any new tasks, and the current task will continue to complete

  3. Stop state: When the shutdownNow method is called, the thread pool enters the stop state, no new tasks are accepted, and running tasks are terminated immediately

  4. Tidying state: When entering this state, the number of tasks and threads in the thread pool are empty

Thread pool running tasks can be completed through the Submit and execute methods

The difference between submit and execute is that submit returns a value and a Future object, through which the thread execution result can be obtained. Execute does not return a value

02

The execute method

Let’s start with the execute method, which is at the heart of the thread pool, because submit calls the Execute method at the bottom.

Before we talk about the execute method, let’s look at ThreadPoolExecutor’s static inner class, the Worker class.

//Worker adopts AQS exclusive mode: two important attributes: state and ExclusiveOwnerThread // State: 0 indicates not occupied > 0 indicates occupied < 0 indicates initial state, in which case locks cannot be robbed. //ExclusiveOwnerThread: exclusive lock thread. private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; // worker internal encapsulated worker Thread final Thread Thread; // if firstTask is not empty, then when the worker starts (the internal thread starts) firstTask will be executed first, and when the firstTask is finished, the queue will fetch the next task. Runnable firstTask; // Record the number of tasks completed by the current worker. Volatile Long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) {// set the AQS exclusive mode to the initialized state, which cannot be preempted setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; Thread = getThreadFactory().newThread(this); }Copy the code

The Worker thread in the thread pool takes Worker as the embodiment. The real working thread is the member variable of Worker. Worker is both Runnable and synchronizer. The Worker takes out the task from the work queue and executes it, and can control the task status through the Worker.

Next, see how to create and run tasks through Worker through the source code of execute method.

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); Int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {// addWorker is the process of creating a thread. The worker object is created and the command is firstTask // core==true to indicate a limit on the number of core threads, False maxinumPoolSize if (addWorker(command, true)) return; c = ctl.get(); } // How many times have I executed this? If (isRunning(c) && if (isRunning(c) && if (isRunning(c) && Workqueue.offer (command)) {dangqctl int recheck = ctl.get(); / /! IsRunning () successfully indicates that the state of the thread pool has been modified by external threads after you commit to the task queue, such as shutDown(),shutDownNow() // remove successfully, after the commit, the thread pool has not consume // remove failed. If (! isRunning(recheck) && remove(command)) reject(command); If (workerCountOf(recheck) == 0) addWorker(null, false) if (workerCountOf(recheck) == 0) } // How many cases can I have at this point? The current thread pool is not in the running state. //1. The offer fails. The queue is full. If the number of threads has not reached maximumPoolSize, a new worker will be created to execute command. If the number of threads has not reached maximumPoolSize, a new worker will fail. //2. The thread pool state is not running. = null addWorker must return false. else if (! addWorker(command, false)) reject(command); }Copy the code

The execution process of the execute method can be roughly divided into the following steps:

  1. If the number of worker threads is smaller than the number of cores, create a core thread.

  2. Reach the core number, enter the task queue;

  3. Task queue full, create non-core thread;

  4. When the number reaches the maximum, the rejection policy is implemented.

Through this operation diagram combined with the above source code may be more clear about the specific execution process of the execute method, the following is to go into the details of each process to analyze.

If the worker thread is smaller than the core thread, a new core task thread is created using the addWorker method.

03

AddWorker method

//firstTask can be null, which means that after starting the worker, the worker automatically obtains the task from the queue.. If it's not null, If the value is true, the value is maximumPoolSize. Private Boolean addWorker(Runnable) FirstTask, Boolean core) {// Spin: check whether the current state of the thread pool is retry: for (; ;) Int c = ctl.get(); Int rs = runStateOf(c); If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // Internal spin: get the process for creating the thread token for (; ;) { int wc = workerCountOf(c); / / whether the current thread more than limit, more than limit will not be able to create a thread if (wc > = CAPACITY | | wc > = (core? corePoolSize : maximumPoolSize)) return false; / / thread by cas number plus 1, successfully plus one is equal to the application to create a thread token if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); // re-read CTL // Check whether the current thread state has changed if (runStateOf(c)! = rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; Try {// create work w = new Worker(firstTask); // Assign the Thread of the newly created work node to t final Thread t = w.hash; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; // Hold the global lock, which may block until it is acquired successfully. All operations that manipulate the thread pool must hold the lock at the same time. mainLock.lock(); Int rs = runStateOf(ctl.get()); // if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // Add workers.add(w); // Add workers.add(w); Int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; }} finally {// unlock mainlock. unlock(); } if (workerAdded) {// Start t.start(); workerStarted = true; }} finally {// start failure if (! AddWorkerFailed (w); addWorkerFailed(w); } return workerStarted; }Copy the code

The addWorker method basically does two things

Step 1: Determine whether a new Work can be created

Step 2: Create a new Work if it can be created, add it to the task queue, and start the thread.

The new Work is added to the task queue, which stores the Work via HashSet, and finally starts the thread. After the thread is started, the method to execute the task is not execute, but through

The run method in the Work class.

04

RunWorker method

After starting the thread with the execute method, the task is run by calling ThreadPoolExecutor’s runWork method via the run method in the Work class.

Public void run() {runWorker(this); } final void runWorker(Worker w) {// Thread wt = thread.currentThread (); // Task Runnable task = w.task; W.firsttask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; Try {// get the task, if there is a first task, execute the first task first // as long as the task can get, this is an infinite loop // getTask: get the task while (task! = null || (task = getTask()) ! = null) {// Lock, because when the shutDown method is called it will determine whether the lock is currently held, the lock will skip it and proceed to the next task w.lock(); / / check the Thread pool state if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) &&! wt.isInterrupted()) wt.interrupt(); BeforeExecute (wt, task); try {// Hook method. BeforeExecute (wt, task); Throwable thrown = null; Try {// Where the actual task is executed // Task can be either FutureTask or a generic Runnable interface implementation class. // If the previous runnable/ Callable was submitted via submit(), it will be encapsulated as FutureTask. This is not clear, look at the previous issue, station B. task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}Copy the code

The runWorker method is the method that actually executes the task. If there is a first task, the first task is executed first. After the first task is executed, the getTask () method is used to fetch the task from the task queue and execute it.

05

GetTask () method

Private Runnable getTask() {// Whether timeout Boolean timedOut = false; // Did the last poll() time out? // spin for (; ;) { int c = ctl.get(); int rs = runStateOf(c); / / when the future state of pool is SHUTDOWN will finish the task in the queue until the queue is empty / / is the state of the thread pool exit if the stop the if (rs > = SHUTDOWN && (rs > = stop | | workQueue. IsEmpty ())) { decrementWorkerCount(); return null; Int wc = workerCountOf(c); // The number of core threads is allowed to timeout, which means that all threads are allowed to timeout. Is the core work thread count is greater than the quantity, this is certainly allow timeout / / note that non-core thread is must let the timeout, in this case, the timeout is quests timeout Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize;  if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; Runnable r = timed? Runnable r = timed? Runnable r = timed? Runnable r = timed? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // Return if (r! = null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }}}Copy the code

The poll(timeout, unit) method of the BlockingQueue or take() method is determined by the number of worker threads.

The poll(timeout, unit) method returns NULL on timeout or if timeout<=0 and the queue is empty.

The take() method blocks until the task is fetched or an interrupt exception is thrown.

06

The submit method

public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } public Future<? > submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }Copy the code

The Submit method supports passing in Runnable or Callable and wrapping it with the newFaskFor method to FutureTask for processing. FutureTask will explain this in more detail in the next article. FutureTask does two main things. The other thing to do with the result value is to expose its get method, which gets the result of the execution. This GET method is blocked.

conclusion

I created a mind map to walk through the thread pool