The transition diagram for the life cycle of ThreadPoolExecutor is shown below.

Shutdown method

The shutdown() method source code is very simple

public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
Copy the code

The checkShutdownAccess() method is security permission management-related, and onShutdown() is an empty method in ThreadPoolExecutor. The main things to know are the advanceRunState(SHUTDOWN),interruptIdleWorkers() methods.

The attribute CTL is an AtomicInteger class that contains two pieces of information: The runState of the thread pool and the number of valid threads in the thread pool (workerCount). The runState is stored in the higher 3 bits and the workerCount is stored in the lower 29 bits. The two variables do not interfere with each other. Using a variable to store two values can avoid inconsistencies when making relevant decisions. It is unnecessary to occupy lock resources to maintain the consistency of the two values

private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; }}Copy the code

Also, see from the source code:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
Copy the code

Each state corresponds to the top 3 bits respectively:

RUNNING 111
SHUTDOWN 000
STOP 001
TIDYING 010
TERMINATED 011

So the advanceRunState method basically sets the CTL state to “targetState”, as it was in the shutdown method.

Here the if (runStateAtLeast (c, targetState) | | ctl.com pareAndSet (c, ctlOf (targetState, WorkerCountOf (c)))) causes only one thread to successfully set the thread pool to targetState.

The interruptIdleWorkers method follows as follows:

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
Copy the code

InterruptIdleWorkers ()

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if(! t.isInterrupted() && w.tryLock()) {try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally{ w.unlock(); }}if (onlyOne)
                break; }}finally{ mainLock.unlock(); }}Copy the code

The worker attribute is a HashSet. For each worker in a HashSet, if the worker’s thread is not interrupted **(! T.isinterrupted ())**, which calls the worker’s tryLock() method to check whether the worker is working. The specific principle needs to be explained in combination with Worker’s workflow.

As shown in figure. When we call threadPoolExecutor. Execute (Runnable Runnable) ways in which to submit a thread pool, will end up in another thread “work” performed in runWorker method

The code for the runWorker method is as follows:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

As you can see, w.lock () is also written in the comments of the source code; Is to allow interruptions. So why does it allow interruptions? That’s because the Worker class inherits AbstractQueuedSynchronizer class, the Worker class will unlock method directly to the AQS state variable to 0, then return true. Therefore, when state is 0, it means that the thread of the Worker can be interrupted.

Unlock logic for the Worker classpublic void unlock(a)      { release(1); }

  public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
 }
Copy the code

On the other hand, in the Worker class’s while loop, if the Worker thread successfully obtains the task, it first calls the Lock method of the Woker class. The lock method attempts to set state from 0 to 1, and if it succeeds, the current Worker’s thread is working.

Public void lock() {acquire(1); } public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }Copy the code

So if you look back at the interruptIdleWorkers() method, you will see this:

private void interruptIdleWorkers(boolean onlyOne) { if (! t.isInterrupted() && w.tryLock()) ...... }Copy the code

How to determine if the current Worker thread is free

ShutdownNow method

The source code is as follows:

public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
Copy the code

As you can see, similar to the Shutdonw method, the shutdownNow method checks the security permissions first and then sets the state to STOP using the advanceRunState method.

Unlike the shutdown method, however, the interruptWorkers method is used to interrupt the Workers worker threads directly, rather than only the idle worker threads as shutdown does.

InterruptWorkers’ logic is as follows:

private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }Copy the code

As can be seen in the Worker constructor, the initial state value is -1. The following comment also states that the disallow is interrupted until the Worker Worker thread runs.

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
Copy the code

When can it be interrupted? As follows:

The runWorker(Worker w) method starts by calling unlock. As we explained earlier, unlock calls tryRealease, where state is set to 0

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts .................. } tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0); return true; }Copy the code

The interruptWorkers method interrupts all workers that have been started.

It then emptying the blocking queue with a simple code:

private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (! q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }Copy the code

Ref:

Implementation Principle of Java Thread Pool and Its Practice in Meituan Business

AQS Acquire (INT ARG) deep parsing _D_ dive blog -CSDN blog