(Mobile phone landscape view source more convenient)

Note: The Java source analysis section is based on the Java 8 version unless otherwise noted.

Note: The thread pool source section refers to the ThreadPoolExecutor class unless otherwise specified.

Introduction to the

After learning about the architecture, construction, and lifecycle of thread pools in Java, we’ll learn how common tasks in thread pools actually work.

Suggest to learn this chapter before going to see tong elder brother before writing “dead knock Java thread series of their own hands to write a thread pool” that two chapters, help to understand the content of this chapter, and the code there is shorter, learn relatively easy.

The problem

(1) How are common tasks performed in the thread pool?

(2) Where was the task performed?

(3) What are the main methods in the thread pool?

(4) How to use Debug mode to Debug thread pool step by step?

Use case

We create a thread pool with a core number of 5, a maximum number of 10, an idle time of 1 second, a queue length of 5, and a rejection policy that prints one sentence.

What happens if you run 20 tasks with it?

Public class ThreadPoolTest01 {public static void main(String[] args) {public static void main(String[] args) {public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 1, timeunit. SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(currentThreadName() + ", discard task"); }}); Num for (int I = 0; i < 20; i++) { int num = i; threadPool.execute(()->{ try { System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }}); } } private static String currentThreadName() { return Thread.currentThread().getName(); }}Copy the code

We will not explain the constructor’s seven parameters in detail. If you are interested, you can refer to the chapter of The Deadknock Java Thread Series thread Pool In-depth Parsing – Constructor.

Let’s take a look at the results of a run:

pool-1-thread-1, 0 running, 1572678434411 pool-1-thread-3, 2 running, 1572678434411 pool-1-thread-2, 1 running, 1572678434411 pool-1-thread-4, 3 running, 1572678434411 pool-1-thread-5, 4 running, 1572678434411 pool-1-thread-6, 10 running, 1572678434412 pool-1-thread-7, 11 running, 1572678434412 pool-1-thread-8, 12 running, 1572678434412 main, discard task main, discard task main, discard task main, discard task main, Discard task this log is generated by pooling 1-thread-9, 13 running, 1572678434412 pool-1-thread-10, 14 running, 1572678434412 pool-1-thread-3, 5 running, 1572678436411 pool-1-thread-1, 6 running, 1572678436411 pool-1-thread-6, 7 running, 1572678436412 pool-1-thread-2, 8 running, 1572678436412 pool-1-thread-7, 9 running, 1572678436412Copy the code

Num (0~4, 10~14, 5~9); num (10~14, 5~9);

Let’s debug it step by step.

The execute () method

The execute() method is one of the core methods for thread pools to submit tasks.

// Submit the task, the task is not executed immediately, Public void execute(Runnable command) {// Task cannot be null if (command == null) throw new NullPointerException();  // Control variable (high 3 bits store state, low 29 bits store number of worker threads) int c = ctl.get(); If (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) return; C = ctl.get(); If (isRunning(c) &&workqueue.offer (command)) {int recheck = ctl.get(); // Check the thread pool state again. If it is not running, remove the task and execute the rejection policy if (! isRunning(recheck) && remove(command)) reject(command); If (workerCountOf(recheck) == 0) addWorker(null, false); if (workerCountOf(recheck) == 0) addWorker(null, false) } // 3. Failed to queue task, try to create non-core worker thread else if (! AddWorker (command, false)) // Reject (command); }Copy the code

We won’t go into detail about thread pool states here, but if you’re interested, check out the chapter on Thread Pool Life Cycle in the Deadknock Java Thread Series.

The process for submitting a task is as follows:

(1) If the number of worker threads is smaller than the number of core threads, create core threads;

(2) Reach the core number and enter the task queue;

(3) The task queue is full, creating a non-core thread;

(4) To reach the maximum number, implement the rejection strategy;

In fact, there are three obstacles – core number, task queue, maximum number, so that it is easier to remember.

The flowchart is roughly as follows:

We know how tasks flow, but where are they executed? Keep reading.

AddWorker () method

This method is mainly used to create a worker thread and start it, which does various checks on the thread pool state, the number of workers, etc.

Private Boolean addWorker(Runnable firstTask, Boolean core) {// Check whether you are qualified to create a new worker thread. Retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); If (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // Check for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; / / the number of + 1 and jump out of the loop if (compareAndIncrementWorkerCount (c)) break retry. c = ctl.get(); // Re-read ctl if (runStateOf(c) ! = rs) continue retry; // else CAS failed due to workerCount change; Retry inner loop}} // If the above conditions are met, the number of worker threads is incremented by one, and the following action is performed to create a thread: Boolean workerStarted = false; boolean workerAdded = false; Worker w = null; Try {// create a Worker thread w = new Worker(firstTask); final Thread t = w.thread; if (t ! = null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new  IllegalThreadStateException(); // Add worker. add(w); // The number of threads still in the pool (only used in mainLock) int s = worker.size (); if (s > largestPoolSize) largestPoolSize = s; WorkerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {t.start(); workerStarted = true; }}} finally {// Thread start failed, execute failed method (thread count minus 1, execute tryTerminate() method, etc.) if (! workerStarted) addWorkerFailed(w); } return workerStarted; }Copy the code

This is actually not where the task is executed. Above we can see that the thread is contained in the Worker class, so let’s trace it to this class.

The Worker inner class

The Worker internal class can be seen as a wrapper around Worker threads. Generally speaking, Worker threads refer to workers, but actually refer to Thread instances maintained by them.

// Worker inherits from AQS, Properties of the lock private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/ / real working Thread final Thread thread; // the firstTask is passed in from the constructor, Runnable firstTask; // Volatile long completedTasks; Worker(Runnable firstTask) {setState(-1); Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; Thread = getThreadFactory().newthread (this); Public void run() {// Call ThreadPoolExecutor's runWorker() method runWorker(this); } // omit the lock part}Copy the code

ThreadPoolExecutor’s runWorker() method is called when the Worker Thread starts.

RunWorker () method

The runWorker() method is where the actual task is performed.

Final void runWorker(Worker w) {// Thread wt = thread.currentThread (); // Task Runnable task = w.task; w.firstTask = null; // ignore the interrupt flag w.nlock (); // allow interrupts boolean completedAbruptly = true; // The getTask() method returns a task that cannot be empty, because execute() is empty. When does getTask() return an empty task? while (task ! = null || (task = getTask()) ! = null) { w.lock(); / / check the status of a Thread pool the if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) &&! wt.isInterrupted()) wt.interrupt(); BeforeExecute (wt, task); try {// Hook method. BeforeExecute (wt, task); Throwable thrown = null; Try {// Where the real task is executed task.run(); } catch (RuntimeException x) {thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); AfterExecute (task, thrown);} Finally {// Hook method afterExecute(task, thrown); }} finally {// task = null; W.com pletedTasks++; w.unlock(); } } completedAbruptly = false; ProcessWorkerExit (w, completedAbruptly); processWorkerExit(w, completedAbruptly); }}Copy the code

This method is relatively simple. It ignores state detection and lock content. If there is a first task, it is executed first, and then it is executed by fetching the task from the task queue via getTask().

getTask()

A method of fetching tasks from a queue, which includes control over thread pool state, idle time, and so on.

Private Runnable getTask() {// Whether timeout Boolean timedOut = false; // loop for (;;) { int c = ctl.get(); int rs = runStateOf(c); / / thread pool state is SHUTDOWN will finish the task in the queue until the queue is empty / / thread pool state is STOP immediately to withdraw from the if (rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // The number of core threads is allowed to timeout, which means that all threads are allowed to timeout. If the number of worker threads is greater than the number of core threads, this must allow timeout. In this case, the timeout is actually quests timeout Boolean timed = allowCoreThreadTimeOut | | wc > corePoolSize; / / timeout judgment to judge (also contains some fault tolerance) if ((wc > maximumPoolSize | | (timed && timedOut)) && (wc > 1 | | workQueue. IsEmpty ())) {/ / timeout, Reduce the number of threads, and returns null if (compareAndDecrementWorkerCount (c)) return null; // Fail to reduce the number of worker threads, retry continue; Runnable r = timed by default, the poll() method is called only when the number of worker threads is greater than the number of core threads. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // If (r! = null) return r; Return null timedOut = true in continue if; } catch (InterruptedException retry) {the interrupt flag is set when shutDown() or shutDownNow() is invoked. Returns to the first if of the for loop to determine whether the state should return null timedOut = false; }}}Copy the code

Note that the poll(timeout, unit) method of BlockingQueue or take() method is used depending on the number of worker threads.

The poll(timeout, unit) method returns NULL on timeout or if timeout<=0 and the queue is empty.

The take() method blocks until the task is fetched or an interrupt exception is thrown.

Therefore, if keepAliveTime is set to 0, when the task queue is empty, non-core threads cannot fetch the task and immediately end its life cycle.

By default, core threads are not allowed to time out, but they can be set to do so by using the following method.

public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            interruptIdleWorkers();
    }
}Copy the code

At this point, the execution flow of tasks in the thread pool ends.

Now the opening question

The value of num is 0~4, 10~14, and 5~9.

Parameters of the thread pool: core number 5, maximum number 10, task queue 5.

A: The first five tasks were executed just before the number of cores was reached, so new core threads were created and executed.

The middle five tasks have reached the core number, so they are queued first;

For the next five tasks, the core number was reached and the queue was full, so new non-core threads were created and executed;

When the last five tasks are executed, the thread pool is full, so the reject policy is executed.

conclusion

This chapter examines the flow of common task execution in a thread pool using an example and an important approach to thread pools.

(1) Execute (), the method of submitting tasks. According to the number of cores, the size of task queue and the maximum number, the task can be divided into four situations to determine where the task should go;

(2) addWorker(), the method of adding Worker threads, encapsulates a Thread instance through Worker internal class to maintain the execution of Worker threads;

(3) runWorker(), where the real task is executed. The first task is executed first, and then the tasks are continuously fetched from the task queue for execution;

(4) getTask(), where the task is actually fetched from the queue. By default, the poll() or take() method of the queue is judged according to the relationship between the number of worker threads and the number of cores. KeepAliveTime parameter is also used here.

eggs

What is the difference between core threads and non-core threads?

A: There is really no difference, mainly based on the corePoolSize to determine where the task should go, there is no difference in the process of executing the task. It is possible that the core thread was created when the keepAliveTime expired or that the core thread was created when the keepAliveTime expired.

What is the significance of Worker’s inheritance from AQS?

Previously, we saw the definition of Worker’s internal class. It inherits from AQS and naturally has its own lock feature. Then, what is its lock used for? Does it have anything to do with the execution of the mission?

The Worker class is used across threads. The lock() method is used only in the runWorker() method, but tryLock() is used in the interruptIdleWorkers() method.

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}Copy the code

The interruptIdleWorkers() method, which means to interrupt idle threads, interrupts only the poll() or take() methods of BlockingQueue, not the task in progress.

In general, the interruptIdleWorkers() method is called on the main thread rather than on the worker thread. Remember the shutdown() and shutdownNow() methods described in the Deadknock Java Thread Series thread Pool In-depth Analysis — Life Cycle?

In shutdown(), the interruptIdleWorkers() method is called. TryLock () obtains the lock and then interrupts. If the lock is not obtained, the tryLock is not interrupted. Which means there’s a mission going on.

ShutdownNow () interrupts the thread violently, without tryLock(), so calling shutdownNow() may interrupt the task in progress.

Therefore, Worker inherits AQS to actually use its lock ability, which is mainly used to control not to interrupt the thread that is executing the task when shutdown() occurs.

Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.