The premise

JUC thread pool ThreadPoolExecutor (ThreadPoolExecutor) {ThreadPoolExecutor (ThreadPoolExecutor); Concurrency master Doug Lea designed the top-level interface of ThreadPoolExecutor’s submitted tasks to have only one stateless execution method:

public interface Executor {

    void execute(Runnable command);
}    
Copy the code

The ExecutorService provides a number of extensibility methods, and the underlying extensibility is based on the Executor#execute() method. This article focuses on the implementation of ThreadpoolOr #execute(), which will be analyzed in detail from the implementation principle, source code implementation and simplified examples. The source code for ThreadPoolExecutor is essentially unchanged from JDK8 to JDK11, which was used when this article was written.

The principle of ThreadPoolExecutor

AbstractQueuedSynchronizer ThreadPoolExecutor inside to use JUC synchronizer framework (commonly known as AQS), a large number of operations, the CAS operation. ThreadPoolExecutor provides fixed active threads (core threads), additional threads (additional threads created in the thread pool capacity to the number of core threads, referred to as non-core threads), task queues, and rejection policies.

JUC synchronizer framework

ThreadPoolExecutor uses the JUC synchronizer framework for four main purposes:

  • Global lockmainLockThe member property is a reentrant lockReentrantLockType, used primarily to access worker threadsWorkerSet and lock operations when recording data statistics.
  • Condition variables,termination.ConditionType used primarily by threads to wait for finalizationawaitTermination()Method with a deadline block.
  • Task queueworkQueue.BlockingQueue<Runnable>Type, task queue, used to store tasks to be executed.
  • Worker thread, inner classWorkerType, which is the actual worker thread object in the thread pool.

About AQS before I wrote an article on the relevant source code analysis, JUC synchronizer framework AbstractQueuedSynchronizer source graphic analysis.

Core thread

ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor: ThreadPoolExecutor

  • The processing of abnormal task execution is not considered for the time being.
  • The task queue is unbounded.
  • The thread pool capacity is fixed to the number of core threads.
  • Ignore the rejection strategy for now.
public class CoreThreadPool implements Executor {

    private BlockingQueue<Runnable> workQueue;
    private static final AtomicInteger COUNTER = new AtomicInteger();
    private int coreSize;
    private int threadCount = 0;

    public CoreThreadPool(int coreSize) {
        this.coreSize = coreSize;
        this.workQueue = new LinkedBlockingQueue<>();
    }

    @Override
    public void execute(Runnable command) {
        if (++threadCount <= coreSize) {
            new Worker(command).start();
        } else {
            try {
                workQueue.put(command);
            } catch (InterruptedException e) {
                throw newIllegalStateException(e); }}}private class Worker extends Thread {
        private Runnable firstTask;

        public Worker(Runnable runnable) {
            super(String.format("Worker-%d", COUNTER.getAndIncrement()));
            this.firstTask = runnable;
        }

        @Override
        public void run(a) {
            Runnable task = this.firstTask;
            while (null! = task ||null! = (task = getTask())) {try {
                    task.run();
                } finally {
                    task = null; }}}}private Runnable getTask(a) {
        try {
            return workQueue.take();
        } catch (InterruptedException e) {
            throw newIllegalStateException(e); }}public static void main(String[] args) throws Exception {
        CoreThreadPool pool = new CoreThreadPool(5);
        IntStream.range(0.10)
                .forEach(i -> pool.execute(() ->
                        System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i)))); Thread.sleep(Integer.MAX_VALUE); }}Copy the code

The result is as follows:

Thread:Worker-0,value:0
Thread:Worker-3,value:3
Thread:Worker-2,value:2
Thread:Worker-1,value:1
Thread:Worker-4,value:4
Thread:Worker-1,value:5
Thread:Worker-2,value:8
Thread:Worker-4,value:7
Thread:Worker-0,value:6
Thread:Worker-3,value:9
Copy the code

This thread pool is designed so that the core thread is created lazily and blocks the take() method on the task queue if the thread is idle. ThreadPoolExecutor is implemented similarly. However, if keepAliveTime is used and core threads are allowed to timeout (allowCoreThreadTimeOut is set to true), polling is done using BlockingQueue#poll(keepAliveTime) instead of permanent blocking.

Other additional features

When building an instance of ThreadPoolExecutor, you need to define maximumPoolSize (the maximum number of threads in the thread pool) and corePoolSize (the number of core threads). When the task queue is a bounded blocking queue, the core thread is full, and the task queue is full, an additional maximumPoolSize -corePoolsize thread is attempted to execute the newly submitted task. The two main additional functions implemented here when ThreadPoolExecutor are:

  • Non-core threads are created to perform tasks under certain conditions, and the recycle cycle of non-core threads (the end of the thread’s life cycle) iskeepAliveTime, the thread life cycle terminates if the next time a task is fetched from the task queue and its lifetime exceedskeepAliveTime.
  • Provides a reject policy that triggers a reject policy when the core thread is full, the task queue is full, and the non-core thread is full.

Source code analysis

We’ll examine the key properties of thread pools, then their state control, and finally the ThreadPoolExecutor#execute() method.

Key attributes

public class ThreadPoolExecutor extends AbstractExecutorService {

    // Control variables - store state and number of threads
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // The task queue must be a blocking queue
    private final BlockingQueue<Runnable> workQueue;

    // A collection of worker threads, which holds all (active) worker threads in the thread pool and can only be accessed if the global lock mainLock is held
    private final HashSet<Worker> workers = new HashSet<>();
    
    / / global lock
    private final ReentrantLock mainLock = new ReentrantLock();

    // Wait condition variable used by the awaitTermination method
    private final Condition termination = mainLock.newCondition();

    // Record the peak number of threads
    private int largestPoolSize;
    
    // Record the number of completed tasks
    private long completedTaskCount;
    
    // Thread factory, used to create new thread instances
    private volatile ThreadFactory threadFactory;

    // Reject execution handler, corresponding to different reject policies
    private volatile RejectedExecutionHandler handler;
    
    // The interval in which idle threads wait for a task, in nanoseconds
    private volatile long keepAliveTime;
    
    // Whether to allow the core thread to timeout. If true, keepAliveTime applies to the core thread
    private volatile boolean allowCoreThreadTimeOut;
    
    // Number of core threads
    private volatile int corePoolSize;

    // Thread pool capacity
    private volatile int maximumPoolSize;

    // omit other code
}    
Copy the code

Let’s look at the constructor with the longest argument list:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

You can customize the number of core threads, the capacity of the thread pool (maximum number of threads), the waiting period of idle threads, the task queue, the thread factory, and the rejection policy. The following is a brief analysis of the meanings and functions of each parameter:

  • CorePoolSize: int number of core threads.
  • MaximumPoolSize: int specifies the maximum number of threads, or the capacity of the thread pool.
  • KeepAliveTime: a type of long. It is also related to the life cycle of a worker thread.
  • unit:TimeUnitType,keepAliveTimeThe time unit of the parameter is actuallykeepAliveTimeIt will eventually convert to nanoseconds.
  • workQueue:BlockingQueue<Runnable>Type, wait queue or task queue.
  • threadFactory:ThreadFactoryType, thread factory, used to create worker threads (both core and non-core), used by defaultExecutors.defaultThreadFactory()As an instance of a built-in thread factory, a custom thread factory is generally a better way to track worker threads.
  • handler:RejectedExecutionHandlerMore commonly known as a rejection policy, a rejection policy is executed when the blocking queue is full, there are no free threads (both core and non-core) and the task continues to be submitted. Four built-in rejection policy implementations are provided:
    • AbortPolicy: Reject the policy directly. That is, the policy is thrown directly without executing the taskRejectedExecutionException, this isDefault reject policy.
    • DiscardPolicy: Discard policy, that is, simply ignore the submitted task (in layman’s terms, empty implementation).
    • DiscardOldestPolicy: Discard the oldest quest strategy, namely passpoll()Method retrieves the task discard from the queue head and executes the currently committed task.
    • CallerRunsPolicy: The caller implements the policy, that is, the current callExecutor#execute()The thread directly calls the taskRunnable#run().This strategy is usually used when you don’t want to lose a task, but from a practical point of view, the original asynchronous invocation intent degrades to a synchronous invocation.

State control

State control mainly revolves around atomic integer member variable CTL:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Get the running status from the CTL value
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
// Get the number of worker threads from the CTL value
private static int workerCountOf(int c)  { return c & COUNT_MASK; }

// Calculate the value of CTL by running status and number of worker threads, or operation
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

// The number of CAS threads increases by 1
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

// The number of CAS operation threads is reduced by 1
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

// The number of threads is reduced by 1
private void decrementWorkerCount(a) {
    ctl.addAndGet(-1);
}
Copy the code

The state variable of the thread pool is COUNT_BITS, which has a value of integer.size-3 (29) :

The size of an instance of the Integer wrapper type Integer is 4 bytes, which is a total of 32 bits, or a total of 32 bits used to hold 0s or 1s. In the ThreadPoolExecutor implementation, a 32-bit integer wrapper type is used to store the number of worker threads and thread pool state. The lower 29 bits are used to store the number of worker threads and the higher 3 bits are used to store the state of the thread pool, so there can be no more than 2^3 states of the thread pool. The maximum number of worker threads is 2^ 29-1, more than 500 million, this number will not be considered for a short time.

COUNT_MASK: (1 < COUNT_BITS) -l: (1 < COUNT_BITS) -l: (1 < COUNT_BITS) -l: (1 < COUNT_BITS) -l: (1 < COUNT_BITS) -l

The thread pool state constants are RUNNING state constants.

/ / - 1 complement for: 111-11111111 111111111111111111111
/ / left after 29:111-00000000 000000000000000000000
// The base 10 value is -536870912
// The higher three bits of 111 indicate that the thread pool is running
private static final int RUNNING = -1 << COUNT_BITS;
Copy the code

The composition of the control variable CTL is obtained by the operation or of the thread pool running state RS and the number of worker threads WC:

/ / rs = RUNNING value is: 111-00000000 000000000000000000000
/ / wc has a value of 0:00 0-00000000000000000000000000000
/ / rs | : to the result of the wc 111-00000000 000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { 
    return rs | wc; 
}
Copy the code

So how do we get the top three digits out of the CTL? The runStateOf() method provided in the source code above extracts the running state:

/ / to take the COUNT_MASK (~ COUNT_MASK), get: 111-00000000 000000000000000000000
/ / CTL bitmap features are: XXX - yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
// Do both the and operation to get the higher 3 digits XXX
private static int runStateOf(int c){ 
    return c & ~COUNT_MASK; 
}
Copy the code

Similarly, remove the low 29 only need to CTL and COUNT_MASK 000-11111111 (111111111111111111111) to do with the operation at a time.

To summarize the thread pool’s health constants:

The name of the state The bitmap Decimal value describe
RUNNING 111-00000000 000000000000000000000 – 536870912. A running state that can receive new tasks and execute tasks in the task queue
SHUTDOWN 000-00000000 000000000000000000000 0 Shutdown: no new tasks are received, but tasks in the task queue are executed
STOP 001-00000000 000000000000000000000 536870912 The stopped state does not receive new tasks, does not execute tasks in the task queue, and interrupts all tasks in execution
TIDYING 010-00000000 000000000000000000000 1073741824 Uncollated state, where all tasks have terminated and the number of worker threads is 0. The worker thread transitioned to this state calls the hook methodterminated()
TERMINATED 011-00000000 000000000000000000000 1610612736 Finalization, hook methodterminated()completed

Here’s a special trick. Since the run-state values are stored three bits higher, you can compare and determine the state of a thread pool directly from a decimal value (you can even ignore the lower 29 bits and compare directly with a CTL, or use a CTL to compare with a thread pool state constant) :

RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)

Here are three ways to use this technique:

// CTL compares with the state constant to determine whether the value is less than
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// CTL compares with the state constant to determine whether it is less than or equal to
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// CTL compares with the state constant SHUTDOWN to determine whether the state is RUNNING
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
Copy the code

Finally, the transition diagram for thread pool state:

PS: There are many intermediate variables in the thread pool source code with simple single letter, for example, C is CTL, WC is worker count, rs is running status.

Execute method source code analysis

ThreadPoolExecutor#execute() ¶

// Execute the command, where the command object is an instance of Runnable
public void execute(Runnable command) {
    // Determine that the command (task) object is not empty
    if (command == null)
        throw new NullPointerException();
    // Get the value of CTL
    int c = ctl.get();
    If the current number of worker threads is smaller than the number of core threads, a new core thread is created and the incoming task is executed
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            // If the new core thread is successfully created, it returns
            return;
        // Failed to create the core thread and need to update the temporary variable c of the CTL
        c = ctl.get();
    }
    // Failed to create a new core thread, i.e. the number of current worker threads is greater than or equal to corePoolSize
    // Check whether the thread pool is running and try to add a task to the task queue using a non-blocking method (false if the task fails to be added)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // The thread pool status is rechecked when the task is successfully placed into the task queue
        // If the thread pool is in a non-running state, the current task is removed from the task queue.
        if (! isRunning(recheck) && remove(command))
            // Call reject policy processing task - returns
            reject(command);
        // Go to the following else if branch, which has the following premises:
        The task to be executed has been added to the task queue
        // The thread pool is in the RUNNING state
        // The incoming task may fail to be removed from the task queue (the only way to remove a task is if the task has already been executed)
        // If the current number of worker threads is 0, a non-core thread is created and the task object passed in is null - returned
        // Non-core threads are created that do not run immediately, but wait for tasks that fetch the task queue to execute
        // If the number of previous worker threads is not zero, it should be the last else branch, but we can do nothing because the task has been successfully queued. There will always be a good time to allocate another free thread to execute it
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    // The following preconditions exist:
    // the number of worker threads in the thread pool is greater than or equal to the corePoolSize.
    The thread pool may not be in the RUNNING state
    The thread pool may be in the RUNNING state and the task queue is full
    // If placing a task to the task queue fails, an attempt is made to create a non-core thread to pass in the task to execute
    // Failed to create a non-core thread
    else if(! addWorker(command,false))
        // Call reject policy processing task - returns
        reject(command);
}
Copy the code

Here’s a quick look at the process:

  1. If the total number of current worker threads is less thancorePoolSize, the core thread is created directly to execute the task (the task instance is passed in to be used directly to construct the worker thread instance).
  2. If the number of current worker threads is greater than or equal tocorePoolSizeIf the current number of worker threads is 0, a non-core thread will be created and the task object passed in is NULL.
  3. If placing a task to the task queue fails (the task queue is full), an attempt is made to create a non-core thread to pass in the task instance to execute.
  4. If a non-core thread fails to be created, reject the task and invoke the reject policy to process the task.

Here’s a bit of a puzzle: why do I need to double-check the running status of the thread pool when the current worker thread count is zero and try to create a non-core thread with a null task object passed in? This can be seen in the API comments:

If a task is successfully added to the task queue, we still need to double-check whether we need to add another worker thread (since all alive worker threads may have terminated after the last check) or whether the thread pool was shutdown when the current method was executed. So we need to double check the state of the thread pool, remove the task from the queue if necessary, or create a new worker thread if no worker thread is available.

The task submission process from the caller’s point of view is as follows:

AddWorker method source code analysis

Boolean addWorker(Runnable firstTask, Boolean core) The first argument to the method can be used to pass in the task instance directly. The second argument identifies whether the worker thread to be created is a core thread. The method source code is as follows:

// Add a worker thread. False indicates that no new worker thread was created, and true indicates that the worker thread was successfully created and started
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:  
    // Note that this is an infinite loop - the outermost loop
    for (int c = ctl.get();;) {
        // This is a very complicated condition. Here we split several and (&&) conditions:
        // 1. The thread pool state is at least SHUTDOWN, i.e. Rs >= SHUTDOWN(0)
        // 2. The thread pool state is at least STOP, i.e. Rs >= STOP(1), or the incoming task instance firstTask is not null, or the task queue is empty
        If the thread pool is in the shutdown state, no new tasks will be accepted. Under this premise, no new threads need to be added if the thread pool is in the STOP state, the incoming task is not empty, or the task queue is empty (there are no backlog tasks)
        if(runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask ! =null
                || workQueue.isEmpty()))
            return false;
        // Note that this is also a dead-layer loop
        for (;;) {
            // Each round of the loop retrieves the worker count wc
            If wc >= corePoolSize, return false. If wc >= corePoolSize, return false
            If wc >= maximumPoolSize, false is returned. If wc >= maximumPoolSize, false is returned
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // If CAS succeeds in updating the number of worker threads wc, break to the outermost loop
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // The state of the thread pool has changed from RUNNING to SHUTDOWN. The state of the thread pool has changed from RUNNING to SHUTDOWN
            c = ctl.get();  // Re-read ctl
            // If the status of the thread pool has changed from RUNNING to SHUTDOWN, the thread pool will jump back to the outer loop and continue executing
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // If the status of the thread pool is still RUNNING and the CAS fails to update the number of worker threads, the failure may be caused by concurrent updates
            // else CAS failed due to workerCount change; retry inner loop }}// Flags whether the worker thread started successfully
    boolean workerStarted = false;
    // Flag whether the worker thread was created successfully
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Create a Worker instance by passing in task firstTask. The Worker construct creates a new Thread object through the Thread factory, so we can directly operate Thread t = w.hash
        // The Worker instance has been created, but has not been added to the Worker Thread collection or started its Thread instance
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            // Global locking is required because some index values and non-thread-safe collections will be changed
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // The thread pool state has changed to SHUTDOWN after the lock is successfully acquired
                // 1. If the state of the thread pool is still RUNNING, it only needs to check whether the thread instance is alive and needs to be added to the Worker thread set and start a new Worker
                // 2. If the thread pool state is smaller than STOP (i.e., RUNNING or SHUTDOWN) and the incoming task instance firstTask is null, it needs to be added to the Worker thread set and start a new Worker
                // for 2, in other words, if the thread pool is SHUTDOWN and the incoming task instance firstTask is not null, no new Worker will be added to the Worker thread set and started
                // It is possible that this step creates a new Worker instance but does not start (temporary objects, without any strong references), and the Worker may succeed in the next round of garbage collection
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // Add the created worker thread instance to the worker thread collection
                    workers.add(w);
                    int s = workers.size();
                    // Try to update the historical peak number of worker threads, i.e., the thread pool peak capacity
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // If the update worker thread started successfully, the flag is true, and then the Thread#start() method is called to start the actual thread instance
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            // If the Worker thread is successfully added, the real thread instance is started by calling Thread#start() of the thread instance t inside the Worker
            if (workerAdded) {
                t.start();
                // flag that the thread started successfully
                workerStarted = true; }}}finally {
        // The thread fails to start and the corresponding Worker needs to be removed from the Worker thread set
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// Failed to add Worker
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Remove it from the worker thread set
        if(w ! =null)
            workers.remove(w);
        // Wc count minus 1
        decrementWorkerCount();
        // Attempt to terminate the thread pool based on state judgment
        tryTerminate();
    } finally{ mainLock.unlock(); }}Copy the code

Doug Lea is a big fan of complex conditional judgments, and he doesn’t like curly braces for single-line complex judgments. Code like the following is common in many of his libraries:

if(runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask ! =null
        || workQueue.isEmpty()))
    return false;
/ /...
// The code is split as follows
boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN);     # rs >= SHUTDOWN(0)
booleanatLeastStop = runStateAtLeast(c, STOP) || firstTask ! =null || workQueue.isEmpty();     
if (atLeastShutdown && atLeastStop){
   return false;
}
Copy the code

In the above analysis logic, it should be noted that when the Worker instance is created, a Java Thread instance will be created in its constructor through ThreadFactory. After that, it will check whether the Worker instance needs to be added to the Worker Thread set workers and whether the Thread instance held by the Worker needs to be started. Only when the Thread instance instance is started, the Worker really starts to operate; otherwise, it is just a useless temporary object. The Worker itself also implements the Runnable interface, which can be regarded as a Runnable adapter.

Worker thread internal class Worker source code analysis

Each concrete work threads in a thread pool Worker packaged for inner class instance, Worker inheritance in AbstractQueuedSynchronizer (AQS), implements the Runnable interface:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
    private static final long serialVersionUID = 6138294804551838833L;

    // Save the thread instance created by ThreadFactory, or null if ThreadFactory fails to create a thread
    final Thread thread;
    // Save the Runnable task instance passed in
    Runnable firstTask;
    // Record the total number of tasks completed by each thread
    volatile long completedTasks;
    
    // the only constructor passed in is the task instance firstTask, which can be null
    Worker(Runnable firstTask) {
        // Disallow thread interruption until the runWorker() method is executed
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // Create thread instances with ThreadFactory. Note that the Worker instance itself acts as a Runnable for creating new thread instances
        this.thread = getThreadFactory().newThread(this);
    }

    // Delegate to the external runWorker() method, noting that the runWorker() method is a thread pool method, not a Worker method
    public void run(a) {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    If the value of state is 1, the lock is held. If the value of state is 0, the lock is released
    protected boolean isHeldExclusively(a) {
        returngetState() ! =0;
    }

    CAS determines whether the update from 0 to 1 is successful. If it is successful, set the exclusive thread to the current thread
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0.1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    
    // Set state to 0 instead of checking the variable passed in
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    
    / / lock
    public void lock(a)        { acquire(1); }

    // Try locking
    public boolean tryLock(a)  { return tryAcquire(1); }

    / / unlock
    public void unlock(a)      { release(1); }

    // Whether to lock
    public boolean isLocked(a) { return isHeldExclusively(); }
    
    // The thread instance will be interrupted if the interrupt flag bit is false
    void interruptIfStarted(a) {
        Thread t;
        if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
Copy the code

The logic in Worker’s constructor is very important. Thread instances created by ThreadFactory are passed to Worker instances at the same time. Since the Worker itself implements Runnable, it can be submitted to the Thread for execution as a task. Whenever the Worker’s thread instance W calls Thread#start(), it can execute Worker#run() at the appropriate time. To simplify the logic:

// the addWorker() method
Worker worker = createWorker();
// passed through the thread pool construction
ThreadFactory threadFactory = getThreadFactory();
// Worker constructor
Thread thread = threadFactory.newThread(worker);
// start in the addWorker() method
thread.start();
Copy the code

Worker inherits from AQS, which uses the exclusive mode of AQS. A trick here is to set the resource (state) of AQS to -1 by setState(-1) when constructing Worker. This is because the default value of state in AQS is 0 when the Worker instance is just created, and the thread has not been started yet. Thread interrupts cannot occur at this time, as described in the Worker#interruptIfStarted() method. In Worker, two AQS covering methods tryAcquire() and tryRelease() do not judge external variables. The former directly CAS(0,1), and the latter directly setState(0). Moving on to the core method threadpoolor# runWorker() :

final void runWorker(Worker w) {
    // Get the current thread, which is actually the same thread instance held by the Worker
    Thread wt = Thread.currentThread();
    // Get the initialized task object held by the Worker, stored in the temporary variable task
    Runnable task = w.firstTask;
    // Set the initialization task object passed in the Worker to null
    w.firstTask = null;
    // Since the state in AQS is set to -1 when Worker is initialized, an unlock is required to update the state to 0 to allow thread interruption
    w.unlock(); // allow interrupts
    // Logs whether the thread terminated due to a user exception. The default is true
    boolean completedAbruptly = true;
    try {
        // Initialize a task object that is not null, or fetch a task from the task queue that is not empty (the task fetched from the task queue is updated to the temporary variable task)
        // getTask() because of the blocking queue, if the second half of the while loop is blocked or timeout blocked, getTask() returns null causing the thread to jump out of the loop and terminate
        while(task ! =null|| (task = getTask()) ! =null) {
            // Worker locks, essentially AQS obtains resources and tries to update CAS state from 0 to 1
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // If the thread pool is stopping (i.e. from RUNNING or SHUTDOWN to STOP), make sure that the current worker thread is interrupted
            // Otherwise, ensure that the current thread is not interrupted
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                // Hook method, before the task is executed
                beforeExecute(wt, task);
                try {
                    task.run();
                    // Hook method after task execution - normal case
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // Hook method, after task execution - exception condition
                    afterExecute(task, ex);
                    throwex; }}finally {
                // Empty the task temporary variables. This is important, otherwise the while will execute the same task in an infinite loop
                task = null;
                // Add up the number of tasks completed by the Worker
                w.completedTasks++;
                // set state to 0w.unlock(); }}GetTask () returns null once, and the thread exits normally
        completedAbruptly = false;
    } finally {
        If completedAbruptly is true, the thread exits abnormally due to a user exceptionprocessWorkerExit(w, completedAbruptly); }}Copy the code

The code that determines the interrupt status of the current worker thread is dissected here:

if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();// Let's simplify the logic
If the thread pool is at least STOP, rs >= STOP(1)
boolean atLeastStop = runStateAtLeast(ctl.get(), STOP);
// Check whether the thread pool state is at least STOP, check the interrupted state of the current thread and clear the interrupted state of the current thread
boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP);
if(atLeastStop || interruptedAndAtLeastStop && ! wt.isInterrupted()){ wt.interrupt(); }Copy the code

Thread.interrupted() gets the interrupted status of the Thread and clears the interrupted status. This method is called because it is possible to invoke shutdownNow() while executing the if logic. ShutdownNow () method also has the logic to interrupt all Worker threads. However, since shutdownNow() method will traverse all workers to do thread interruption, it may not be able to interrupt tasks before they are submitted to the Worker for execution. Therefore, the interrupt logic will be executed inside the Worker, which is the logic of the if code block. It should also be noted that all newly submitted tasks will be rejected in the STOP state, tasks in the task queue will not be executed, and all Worker threads will be interrupted. That is, even if the task Runnable has been pulled out of the first half of the runWorker() logic, it may be interrupted before it reaches the point of calling its Runnable#run(). Assuming that the logic that goes into the if block happens and the shutdownNow() method is called externally, the if logic will determine the thread abort status and reset, InterruptWorkers () called in the shutdownNow() method will not cause the thread to interrupt twice (which would cause an exception) because of a problem with the interrupt status judgment.

To summarize the core flow of the runWorker() method above:

  1. WorkerPerform the unlocking operation first to release the uninterruptible state.
  2. throughwhileCycle callgetTask()Method gets the task from the task queue (of course, the first loop could also be an external incoming firstTask instance).
  3. If the thread pool becomes moreSTOPState, you need to ensure that the worker thread is interrupted and interrupt processing, otherwise you must ensure that the worker thread is not interrupted state.
  4. Execution Task InstanceRunnale#run()Method, the hook method is called before and after the execution of the task instance, including normal completion and exception execution, respectivelybeforeExecute()andafterExecute().
  5. whileBreaking out of the loop meansrunWorker()Method end and worker thread lifecycle end (Worker#run()Lifecycle end), will be calledprocessWorkerExit()Handles the aftermath of worker thread exit.

Let’s look at the getTask() method for getting the task from the task queue and the processWorkerExit() method for handling the subsequent work of thread exit.

GetTask method source analysis

The getTask() method is a way for a worker thread to get a task object in the task queue in a while loop:

private Runnable getTask(a) {
    // Records whether the last pull from the queue timed out
    boolean timedOut = false; // Did the last poll() time out?
    // Note that this is an infinite loop
    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        // If: If the thread pool state is at least SHUTDOWN, i.e. Rs >= SHUTDOWN(0), then two cases (or logic) need to be determined:
        // 1. The thread pool state is at least STOP(1), that is, the thread pool is being stopped, usually by calling shutdownNow()
        // 2. The task queue is empty
        // If the thread pool is at least SHUTDOWN and one of the above two conditions is met, the number of worker threads is subtracted by 1 and null is returned
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        // The thread pool is still in the RUNNING state
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        The timed variable controls thread timeout and determines whether poll(), a non-blocking method with timeout, is needed to pull a task from the task queue
        AllowCoreThreadTimeOut The default value is false. If set to true, the poll() method allows the core thread to pull tasks from the task queue as well
        // 2. When the number of worker threads is greater than the number of core threads, additional non-core threads have been created in the thread pool. These non-core threads must have pulled tasks from the task queue via the poll() method
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // Second if:
        Wc > maximumPoolSize indicates that the current number of worker threads is greater than maximumPoolSize, indicating that the thread pool capacity is reduced by setMaximumPoolSize()
        Timed && timedOut indicates that the thread hit the timeout control and the previous loop pulls the task null from the task queue via the poll() method
        If the number of threads is greater than 1 or the task queue is empty, CAS subtracts the number of threads by 1 and returns null.
        // CAS fails to subtract 1 from the number of threads and enters the next loop for retry
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // If timed is true, the poll() method is used to pull timeout, and no valid task is waiting within keepAliveTime, null is returned
            // If timed is false, take() will block until the next valid task is available.
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // This is important because it only returns if it is not null, in which case it goes to the next loop
            if(r ! =null)
                return r;
            The workqueue.poll () method has timed out and returns null
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

In this method, there are two very large if logic, for the first if may cause the number of worker threads to be subtracted by 1 directly return null:

  1. The thread pool status isSHUTDOWN, is generally calledshutdown()Method and the task queue is empty.
  2. The thread pool status isSTOP.

For the second if, the logic is a bit complicated, let’s break it down:

// The total number of worker threads is greater than maximumPoolSize, indicating that the thread pool capacity is reduced by setMaximumPoolSize()
boolean b1 = wc > maximumPoolSize;
// Allows the thread to time out while the previous round pulls null from the task queue using the poll() method
boolean b2 = timed && timedOut;
// The total number of worker threads is greater than 1
boolean b3 = wc > 1;
// The task queue is empty
boolean b4 = workQueue.isEmpty();
boolean r = (b1 || b2) && (b3 || b4);
if (r) {
    if (compareAndDecrementWorkerCount(c)){
        return null;
    }else{
        continue; }}Copy the code

This logic is mostly for non-core threads. In the execute() method, when the total thread pool exceeds corePoolSize and is less than maximumPoolSize, non-core threads are added via addWorker(Task,false) when the task queue is already full. The logic here is exactly like the reverse operation of addWorker(Task,false), which is used to reduce the number of non-core threads so that the total number of worker threads is closer to corePoolSize. If the previous loop is null for a non-core thread, the timed && timedOut loop is easily timed to true, and getTask() returns null causing the Worker#runWorker() method to jump out of the loop. The processWorkerExit() method is then executed to handle the subsequent work, and the Worker corresponding to the non-core thread becomes a “floating object” waiting to be reclaimed by the JVM. When allowCoreThreadTimeOut is set to true, the end-of-life logic analyzed here for non-core threads also applies to core threads. Then the meaning of keepAliveTime can be summarized:

  • When the core thread is allowed to time out, i.eallowCoreThreadTimeOutWhen set to true, at this timekeepAliveTimeRepresents the lifetime of idle worker threads.
  • By default, the core thread is not allowed to time outkeepAliveTimeRepresents the lifetime of idle non-core threads.

In some specific scenarios, a properly configured keepAliveTime can make better use of worker thread resources in the thread pool.

ProcessWorkerExit method source code analysis

The processWorkerExit() method does a cleanup and data logging for the terminating Worker(because the processWorkerExit() method is also wrapped in the runWorker() method finally block, The worker thread is not truly terminated until it executes the processWorkerExit() method.

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // Because a user exception was thrown that resulted in thread termination, the number of worker threads can be reduced by 1
    GetTask () returns null to instruct the thread to exit the runWorker() method's while loop normally if no exception is thrown, in which case the thread count has been reduced by 1 in getTask()
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // The number of completed tasks globally plus the number of completed tasks in the terminating Worker
        completedTaskCount += w.completedTasks;
        // Remove the terminating Worker from the Worker thread set
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
     
    // See the analysis in the next section to determine whether thread pool terminate processing is required based on the current thread pool state
    tryTerminate();

    int c = ctl.get();
    // If the thread pool state is less than STOP, i.e. RUNNING or SHUTDOWN:
    If the core thread is allowed to time out, keep at least one worker thread in the thread pool
    // 2. If the thread terminates due to a user exception, or the current number of working threads, then simply add a new non-core thread
    if (runStateLessThan(c, STOP)) {
        if(! completedAbruptly) {// The minimum value is 0 if the core thread is allowed to timeout, otherwise corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // If the minimum value is 0 and the task queue is not empty, the update minimum value is 1
            if (min == 0&&! workQueue.isEmpty()) min =1;
            If the number of worker threads is greater than or equal to the minimum value, the system returns no new non-core threads
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null.false); }}Copy the code

The next part of the code determines the state of the thread pool. If the thread pool is RUNNING or SHUTDOWN, a new non-core thread will be created if the current worker thread is terminated by throwing a user exception. If the current worker thread does not throw a user exception and is terminated (normally terminated), then it is handled like this:

  • allowCoreThreadTimeOutTrue, that is, if the core thread is allowed to timeout and the task queue is empty, at least one worker thread is kept in the thread pool by creating a non-core thread.
  • allowCoreThreadTimeOutFalse if the total number of worker threads is greater thancorePoolSizeOtherwise, creating a non-core thread will tend to keep the number of worker threads in the thread pool close tocorePoolSize.

When processWorkerExit() completes, it means that the life cycle of the worker thread has ended.

TryTerminate method source analysis

The tryTerminate() method is called when each worker thread terminates:

final void tryTerminate(a) {
    for (;;) {
        int c = ctl.get();
        // Determine the state of the thread pool, if any of the following three cases directly return:
        // 1. The thread pool is RUNNING
        // 2. The thread pool is at least in TIDYING state, which means the following steps are completed and the thread pool is TERMINATED
        // 3. The thread pool is at least stopped and the task queue is not empty
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
            return;
        // If the number of worker threads is not zero, the first idle worker thread in the set is interrupted
        if(workerCountOf(c) ! =0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS sets thread pool state to TIDYING. If it succeeds, the hook method terminated() is executed.
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    // Last updated thread pool state to TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    // Wake up all threads blocking the termination condition, and the await() method of this variable is called in awaitTermination()
                    termination.signalAll();
                }
                return; }}finally {
            mainLock.unlock();
        }
        // else retry on failed CAS}}// interrupt idle worker threads. When onlyOne is true, onlyOne of the worker threads is interrupted
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            Thread interrupts when the thread is not interrupted and attempts to acquire the lock are successful
            if(! t.isInterrupted() && w.tryLock()) {try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally{ w.unlock(); }}// This breaks out of the loop, i.e. only the first worker thread in the collection is interrupted
            if (onlyOne)
                break; }}finally{ mainLock.unlock(); }}Copy the code

The puzzle here is the second if code logic of the tryTerminate() method: if the number of worker threads is not zero, the first idle worker thread in the set of worker threads is interrupted. There is a paragraph in the method API comment that reads:

If otherwise eligible to terminate but workerCount is nonzero, interrupts an idle worker to ensure that shutdown signals propagate. When the condition for terminating the thread pool is met but the number of worker threads is not zero, an idle worker thread needs to be interrupted to ensure that the thread pool closure signal is propagated.

The shutdown() method, which will be examined below, interruptIdleWorkers() interrupts all idle threads. It is possible that a non-idle thread is performing a task, and if it happens to be the core thread, It blocks the take() method of the task queue in the next round of the loop, and if no additional intervention is made, it may even permanently block the take() method of the task queue after the thread pool is closed. To avoid this, each worker thread exits by attempting to interrupt one of the idle threads in the worker thread set, ensuring that all idle threads exit normally.

The interruptIdleWorkers() method does tryLock() on each workline and interrupts the thread only if it returns true. We know that in the runWorker() method, every time a non-null task is obtained from the task queue, the worker thread will first lock the Worker#lock() operation. This prevents the thread from being interrupted during the execution of the task, and ensures that the interrupted worker thread must be free.

Shutdown method source code analysis

There are several related variations of the thread pool closure operation, starting with shutdown() :

public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Verify permissions and determine security policies
        checkShutdownAccess();
        // Set the SHUTDOWN state
        advanceRunState(SHUTDOWN);
        // Interrupt all idle worker threads
        interruptIdleWorkers();
        // Hook method
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // The state is changed to TIDYING by calling terminated and terminated hook terminated. The state is terminated after terminated()
    tryTerminate();
}

// Lift state
private void advanceRunState(int targetState) {
    // assert targetState == SHUTDOWN || targetState == STOP;
    for (;;) {
        int c = ctl.get();
        // If the thread pool state is at least targetState or the CAS is set to targetState, the loop is broken
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break; }}// Interrupt all idle worker threads
private void interruptIdleWorkers(a) {
    interruptIdleWorkers(false);
}
Copy the code

Now look at the shutdownNow() method:

public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Verify permissions and determine security policies
        checkShutdownAccess();
        // Set the STOP state
        advanceRunState(STOP);
        // Interrupt all worker threads
        interruptWorkers();
        // Empty the work queue and fetch all unexecuted tasks
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
     // The state is changed to TIDYING by calling terminated and terminated hook terminated. The state is terminated after terminated()
    tryTerminate();
    return tasks;
}

// Iterate over all worker threads and interrupt if state > 0
private void interruptWorkers(a) {
    // assert mainLock.isHeldByCurrentThread();
    for (Worker w : workers)
        w.interruptIfStarted();
}
Copy the code

ShutdownNow () method will be the first state of the thread pool more to STOP interrupt all Worker threads (AbstractQueuedSynchronizer instances of Worker of the state value greater than 0, which is including the Worker is a mission and free Worker), It then iterates through the task queue, retrieves (removes) all the tasks stored in a list and returns.

Finally look at the awaitTermination() method:

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    // Convert timeout to nanoseconds
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Loop waits until the thread pool state is more TERMINATED, each loop waits nanos nanoseconds
        while (runStateLessThan(ctl.get(), TERMINATED)) {
            if (nanos <= 0L)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
        return true;
    } finally{ mainLock.unlock(); }}Copy the code

AwaitTermination () is not shutdown(), but its processing logic is to ensure that the calling thread blocks until tryTerminate() has successfully updated the thread pool state to TERMINATED. You can use scenarios where you need to sense the end of a thread pool.

It is worth noting that the shutdown() method only interrupts idle worker threads. If the worker thread is executing the task object Runnable#run(), the worker thread in this case will not interrupt, but will terminate the worker thread as determined by the thread pool state while waiting for the next round of getTask().

Understand the reentrant lock mainLock member variable

private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition();

ThreadPoolExecutor mainLock = mainLock;

To summarize the usage scenarios of mainLock:

methods The main role
tryTerminate Ensure that stateTIDYING -> TERMINATEDHook methodterminated()Callback and condition variable wake up
interruptIdleWorkers Protect worker thread interrupts from serialization and “interrupt storms”
addWorker Protects worker thread collections from concurrent addition of worker threads and from metric statistics changes
processWorkerExit Protect metric statistics changes
shutdown,shutdownNowandawaitTermination See below for analysis
getPoolSize,getActiveCount,getLargestPoolSize,getTaskCountandgetCompletedTaskCount Protect metric statistics read, which are generally derived fromWorkerAttribute statistics for collections

Here’s how thread pools can be closed relatively gracefully with reentrant locks and condition variables. First look at the shutdown() method:

public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
Copy the code

Except for tryTerminate(), all shutdown() methods execute inside the lock, ensuring the stability of the worker thread set and closing privileges, ensuring serialization of state changes, interrupting all worker threads and avoiding “interrupt storms” of worker threads (if shutdown() is not locked multiple times concurrently, Interrupts the worker thread repeatedly).

public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try{ checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); # <-- additional step}finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
Copy the code

The shutdownNow() method actually locks for the same purpose as shutdown(), but with an additional step: export the list of remaining task instances in the task queue. The awaitTermination() method uses the previously mentioned conditional variable termination:

// Condition variables must be executed in a lock block, similar to the synchronized keyword
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // An infinite loop ensures waiting for execution and state changes to TERMINATED
        while (runStateLessThan(ctl.get(), TERMINATED)) {
            if (nanos <= 0L)
                return false; nanos = termination.awaitNanos(nanos); # <-- ensure that the current calling thread is blocked and waiting for the corresponding time or the thread pool state is TERMINATED before exiting the wait.return true;
    } finally{ mainLock.unlock(); }}Copy the code

The core function of the awaitTermination() method is: Ensure that the current thread calling awaitTermination() is blocking and waiting for the corresponding time or TERMINATED state of the thread pool. In this case, the user enters an acceptable wait time to block. Or the thread pool can be unblocked if the shutdown() method is invoked in another thread and state changes to TERMINATED. The awaitTermination() method returns a Boolean value. True indicates that the thread pool state is TERMINATED or is TERMINATED waiting for a TERMINATED period. False indicates that the thread pool is waiting for more than the TERMINATED period. The thread pool state is still no more TERMINATED.

How a worker thread in a thread pool can gracefully exit without losing current task execution, abnormal task state, or abnormal data held by the task is a topic worth exploring and will certainly be examined in the future.

Reject method source code analysis

The reject(Runnable Command) method is simple:

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}
Copy the code

Call the member held by the thread pool, RejectedExecutionHandler instance, to call back the task instance and the current thread pool instance.

Hook method analysis

As of JDK11, ThreadPoolExecutor provides the following hook methods:

  • beforeExecute(Thread t, Runnable r): Task objectRunnable#run()Trigger the callback before execution.
  • afterExecute(Runnable r, Throwable t): Task objectRunnable#run()The callback is triggered after execution (both abnormal completion and normal completion).
  • terminated(): When the thread pool is closed, the state changes toTIDYINGThis method is called back on success, and the thread pool state is updated toTERMINATED.
  • onShutdown():shutdown()This method is called back when the method is executed, and the API comments mention that this method is mainly provided forScheduledThreadPoolExecutorUse.

The onShutdown() method modifier is default, and the other three methods are protected. These methods can be extended if necessary to monitor, trigger specific actions based on specific timing, and so on.

Other methods

Thread pool itself provides a large number of data statistics related methods, capacity expansion methods, pre-creation methods and so on, the source code of these methods is not complex, do not expand the analysis here.

Core thread related:

  • getCorePoolSize(): Gets the number of core threads.
  • setCorePoolSize(): Resets the number of core threads in the thread pool.
  • prestartCoreThread(): Prestarts a core thread if and only if the number of worker threads is smaller than the number of core threads.
  • prestartAllCoreThreads(): Prestarts all core threads.

Thread pool capacity related:

  • getMaximumPoolSize(): Gets the thread pool capacity.
  • setMaximumPoolSize(): Resets the maximum capacity of the thread pool.

Thread lifetime correlation:

  • setKeepAliveTime(): Sets the lifetime of idle worker threads.
  • getKeepAliveTime(): gets the lifetime of idle worker threads.

Other monitoring statistics related methods:

  • getTaskCount(): Gets an approximation of the total number of tasks that have been executed.
  • getCompletedTaskCount(): Gets an approximation of the total number of completed tasks that have been executed.
  • getLargestPoolSize(): Gets the peak thread count (maximum pool capacity) of the thread pool.
  • getActiveCount(): gets an approximation of the total number of active threads (worker threads executing a task).
  • getPoolSize(): Gets the capacity of the worker thread collection (the total number of worker threads in the current thread pool).

Methods related to task queue operations:

  • purge(): Removes all yes from the task queueFutureType and already inCancelledStatus tasks.
  • remove(): Removes the specified task from the task queue.
  • BlockingQueue<Runnable> getQueue(): Gets a reference to the task queue.

The setting of some attribute values may affect the state of the thread pool or the increase or decrease of Worker threads. For example, when the number of core threads changes, workers may be directly increased or decreased. Here is an example of threadpoolsize () :

// Set the number of core threads
public void setCorePoolSize(int corePoolSize) {
    // The input value cannot be less than 0 or greater than the capacity of the thread pool
    if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    Delta = The difference between the number of incoming core threads and the number of existing core threads
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    // If the total number of current thread pool worker threads is greater than the number of incoming core threads, all worker threads are interrupted
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // The difference between the number of incoming core threads and the number of existing core threads is greater than 0
        // Calculate the difference between the number of incoming core threads and the number of existing core threads and the minimum number of tasks in the task queue, and add the minimum number of worker thread pool
        // If the task queue is empty, k === 0, then the first condition k- > 0 is not satisfied, so the delta worker thread should be lazy to create when submitting the new task
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null.true)) {
            // If the task queue is empty, the loop is broken
            if (workQueue.isEmpty())
                break; }}}Copy the code

Else if (delta > 0) we don’t know how many new worker threads are really “needed”. As a heuristic, pre-start enough new worker threads (until the number is the size of the core thread pool) to process the current task in the queue, but stop creating new worker threads if the queue becomes empty while doing so.

summary

This article spent a lot of time on a per-line basis analyzing the implementation of the JUC thread pool ThreadPoolExecutor’s core method execute(), which is the cornerstone of the entire thread pool architecture to extend asynchronous callback execution and time-based task scheduling. Behind will write two articles respectively detailed analysis of the thread pool to extend functionality of the service the ExecutorService source implementation and scheduling thread pool ScheduledThreadPoolExecutor source code to achieve, is expected to take about 2 to 3 weeks.