First of all, I would like to say that The Thread pool in Java is really complicated. I have always felt that the implementation of creating several threads without exiting has led to the source code of ThreadPoolExecutor. One of the nastiest aspects of learning source code is the state transitions that are at the heart of ThreadPoolExecutor. It took nearly a week to get a rough idea of how ThreadPoolExecutor works and write it down.

How important thread pools are

Threads are a concept a programmer must be involved with, but thread creation and switching are expensive. So, do we have a good solution to reuse threads? This brings us to the concept of thread pools. There are three obvious benefits to using thread pools properly:

  1. Reduce resource consumption: Reduce thread creation and destruction costs by reusing already created threads
  2. Improved response time: Tasks can be executed immediately upon arrival without waiting for a thread to be created.
  3. Improved thread manageability: Thread pools can be managed, allocated, tuned, and monitored uniformly.

Java multithreaded pool support — ThreadPoolExecutor

Java thread pool support is implemented primarily through ThreadPoolExecutor, which is important because ThreadPoolExecutor is the basis for various ExecutorService thread pool strategies. To understand the various thread pool policies, you must first understand ThreadPoolExecutor.

1. Implementation principle

Let’s start with a thread pool flowchart:

  • Step1. Call the execute thread of ThreadPoolExecutor and check the CorePool first. If the number of threads in the CorePool is smaller than the size of the CorePoolSize thread, the new thread will execute the task.
  • Step2. If the current thread in CorePool is greater than or equal to CorePoolSize, add the thread to BlockingQueue.
  • Step3. If you cannot join BlockingQueue, create a thread to execute the task if it is smaller than MaxPoolSize.
  • Step4. If the number of threads is greater than or equal to MaxPoolSize, execute the reject policy.

2. Create a thread pool

Thread pools can be created using the ThreadPoolExecutor constructor:

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use forholding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code  execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentExceptionif one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

To explain the above parameters:

  1. CorePoolSize Size of the core thread pool
  2. MaximumPoolSize Maximum capacity of the thread pool
  3. KeepAliveTime Duration of a thread when the thread pool is idle
  4. TimeUnit TimeUnit
  5. ThreadFactory specifies the ThreadFactory
  6. BlockingQueue Task queue
  7. RejectedExecutionHandler Rejectpolicy of the thread

3. Thread submission

The constructor of ThreadPoolExecutor is shown above, but it only does some initialization of parameters. Once the ThreadPoolExecutor is initialized, it can submit a thread task. The main submission methods are Execute and submit. Execute, submit will be analyzed in a future blog post.

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn* If the current number of threads is smaller than the core thread pool, a new Worker will be created based on the existing thread running as the first Worker. AddWorker automatically checks the pool state and the number of workers. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back The enqueuing if * stopped, or start a new thread if there are none. * The enqueuing if * stopped, or start a new thread if there are none. If we cannot queue task, then we try to add a new * thread. If it fails, We know we are shut down or saturated * and so reject the task. Task */ int c = ctl.get(); // Create a new Worker if the number of current workers is smaller than the core thread pool size. if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! IsRunning (recheck) && remove(command))// Recheck prevents the pool state from mutating, reject threads, and prevent new threads from being added to the workQueue. Else if (workerCountOf(recheck) == 0)// addWorker on both operations, but if workqueue. offer is 0, then no Worker will execute new tasks. So add worker. addWorker(null, false); } // If the workQueue is full, the thread pool's maxnum may not be reached yet, so try adding a Worker else if (! addWorker(command, false)) reject(command); // If the number of workers reaches the upper limit, reject the thread}Copy the code

A few concepts need to be clarified here:

  • The difference between Worker and Task is that a Worker is a thread in the current thread pool, while a Task is runnable, but it is not really executed. It is just called by the Worker to run. The implementation of this part will be seen later.
  • The difference between maximumPoolSize and corePoolSize is important. MaximumPoolSize is the maximum capacity of the thread pool, i.e. the maximum number of workers the thread pool can hold. CorePoolSize is the size of the core thread pool. When corePoolSize is full and workQueue is full (ArrayBolckQueue can be full), new workers are allowed to process tasks in the workQueue. However, it cannot exceed maximumPoolSize. Threads that exceed corePoolSize are terminated after an idle timeout.

Core method: addWorker Worker addition and Task acquisition and termination are implemented in this method, that is, this method contains a lot of things. The addWorker method mentions the concept of Status, which is the core concept of the thread pool.

/** * CTL is an atomic weight and contains two fields, one is workerCount and the other is runState * workerCount. RunState specifies The current thread pool state. The main pool control state, CTL, is an atomicintegerpacking * two conceptual fields * workerCount, indicating the effective number of threads * runState, How was it that afflicted whether running, shutting down etc. * * First, the workerCount occupies an atomicintegerSo, the workerCount upper limit is (2^29)-1. * In order to pack them into one int, welimit workerCount to
     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
     * billion) otherwise representable. If this is ever an issue in
     * the future, the variable can be changed to be an AtomicLong,
     * and the shift/mask constants below adjusted. But until the need
     * arises, this code is a bit faster and simpler using an int.
     *
     * The workerCount is the number of workers that have been
     * permitted to start and not permitted to stop.  The value may be
     * transiently different from the actual number of live threads,
     * forexample when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported As the current size of the workers set. * * runState indicates the RUNNING life cycle of the entire thread pool. The values are as follows: * 1. * 2. SHUTDOWN: No new threads are added, but threads in the queue are processed. * 3.STOP does not add new threads and does not process threads in the queue. * 4. All threads terminated in TIDYING are in queue and workerCount is 0, so the TIDYING * 5. Terminated () method is terminated. TERMINATED * The runState provides The main lifecyle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     * 状态的转化主要是:
     * RUNNING -> SHUTDOWN(调用shutdown())
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP(调用shutdownNow())
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING(queue和pool均empty)
     *    When both queue and pool are empty
     * STOP -> TIDYING(pool empty,此时queue已经为empty)
     *    When pool is empty
     * TIDYING -> TERMINATED(调用terminated())
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     */
Copy the code

Here is the code for the status:

// CTL is used to ensure the state of the current thread pool and the number of current threads. Ps: the lower 29 bits represent the thread pool capacity, and the higher 3 bits represent the thread status. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); Private static final int COUNT_BITS = integer. SIZE - 3; Private static final int CAPACITY = (1 << COUNT_BITS) -1; private static final int CAPACITY = (1 << COUNT_BITS) -1; // Several states, represented by three digits of Integer // runState is storedinthe high-order bits //111 private static final int RUNNING = -1 << COUNT_BITS; //000 private static final int SHUTDOWN = 0 << COUNT_BITS; //001 private static final int STOP = 1 << COUNT_BITS; //010 private static final int TIDYING = 2 << COUNT_BITS; //011 private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {// Packing and unpacking CTL private static int runStateOf(int c) {returnc & ~CAPACITY; Private static int workerCountOf(int c) {private static int workerCountOf(int c) {returnc & CAPACITY; } private static int ctlOf(int rs, int wc) {return rs | wc; }
Copy the code

Let’s paste the addWorker method:

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked, which requires a
     * backout of workerCount, and a recheck for termination, in case* the existence of this worker was holding up termination. * * @param firstTask the task the new thread should run first  (or * nullif none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only ifnecessary. /** * rs! = Shutdown | | fistTask! = null | | workCount. If the current state of the thread pool > isEmpty * SHUTDOWN so refused to Worker add if = SHUTDOWN * so not new at this time the Task is not null, If any type of Worker cannot be added when WorkCount is empty, * if it is not empty, the Worker whose task is null can be added to increase the consumption Worker */if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        Worker w = new Worker(firstTask);
        Thread t = w.thread;

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if// shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); /** * rs! =SHUTDOWN ||firstTask! If rs>SHUTDOWN is set to Terminate, then Terminate */ is set to SHUTDOWN and firstTask is not set to nullif (t == null ||
                (rs >= SHUTDOWN &&
                 ! (rs == SHUTDOWN &&
                    firstTask == null))) {
                decrementWorkerCount();
                tryTerminate();
                return false;
            }

            workers.add(w);

            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
        } finally {
            mainLock.unlock();
        }

        t.start();
        // It is possible (but unlikely) for a thread to have been
        // added to workers, but not yet started, during transition to
        // STOP, which could result in a rare missed interrupt,
        // because Thread.interrupt is not guaranteed to have any effect
        // on a non-yet-started Thread (see Thread#interrupt).// All running workers are terminated when stopped or interruptedif (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
            t.interrupt();
        return true;
    }
Copy the code

AddWorker first checks the thread pool state:

 int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if// check whether the current thread pool is shutdown. If the current thread pool is shutdown, the thread pool will not join. =SHUTDOWN || first! = null | | workQueue. IsEmpty ()) / / if the rs for SHUTDOWN, the state is the STOP, TIDYING or TERMINATED, so at this time to reject the request / / if the state of SHUTDOWN, and introduced into a single thread that is not null, // If the state is SHUTDOWN and there are no more tasks in the queue, reject itif (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
Copy the code

Thread pool state: thread pool state: thread pool state

  1. If it’s runing, skip if.
  2. If rs>=SHUTDOWN and not equal to SHUTDOWN, then no new thread is accepted.
  3. If rs>=SHUTDOWN, equal to SHUTDOWN, and first! If first==null, then new threads are rejected. If first==null, then new threads may consume threads in Queue. If it isEmpty, then the queue isEmpty and there is no need to add consuming threads. If the queue is not Empty, then run the Worker adding first=null.

You can see some strategies here

  • First of all, when RS >SHUTDOWN, reject any increase of threads, because STOP will terminate all threads and remove all threads to be executed in the Queue, so there is no need to add Worker with first= NULL
  • Second, in the SHUTDOWN state, you cannot add first! At the same time, even if first=null, workers are not allowed to be added to Queue when the Queue is Empty. Workers added under SHUTDOWN are mainly used to consume tasks in Queue.

IsRunning (c) && workQueue. Offer (command) New threads are not allowed to enter the thread pool when the state of the thread pool is >=SHUTDOWN.

            for(;;) { int wc = workerCountOf(c); // If the current number exceeds CAPACITY, or exceeds corePoolSize and maximumPoolSize (depending on core)if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false; //CAS tries to increase the number of threads, and if it fails, a race is proved, then retry.if (compareAndIncrementWorkerCount(c))
                    breakretry; c = ctl.get(); // re-read CTL // Checks the running status of the current thread poolif(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
Copy the code

This code makes a compatibility, mainly when the corePoolSize or maximumPoolSize limit is not reached, then allows to add threads, CAS increases the number of workers after breaking out of the loop. Next, instantiate the Worker. Instantiating the Worker is actually critical, as we’ll see later. Mainlock. lock(); mainlock. lock(); mainlock. lock(); Then re-check the thread pool state, and if the state is incorrect, reduce the number of workers. Why tryTerminate () is not clear. If the status is normal, then add Worker to workers. Finally:

  if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
            t.interrupt();
Copy the code

It is clearly stated in the note that in order to timely interrupt the Worker, the thread cannot respond to the interruption because the thread is not started. If the status changes to STOP at this time, the thread cannot be interrupted. This is used to interrupt threads. Let’s look at Worker’s methods:

 /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
Copy the code

Here you can see that Worker is a wrapper around firstTask, and Worker itself is Runnable, which seems really annoying. Build a thread for the Worker itself via ThreadFactory. Because Worker is Runnable, there is a run method. T.start () is called to execute the run method:

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
Copy the code

Call the runWorker:

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, whileCoping with a number of issues: * 1 The Worker may still execute an initial task -- firstTask. * But sometimes the initialized task (which can be null) is not needed, as long as the pool is running, the task is * fetched from the queue via getTask. If null is returned, the worker exits. * The other is that the external throws an exception causing the worker to exit. * 1. We may start out with an initial task,in which case we
     * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, Which * usually leads processWorkerExit to replace this thread. * * * 2 Locks the worker to prevent other pools from interrupting the worker before running any task. * clearInterruptsForTaskRun guarantee in addition to the thread pool stop, 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and * clearInterruptsForTaskRun called to ensure that unless pool is * stopping, this thread does not have its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4.  Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to * afterExecute. We separately handle RuntimeException, Error * (both of which the specs guarantee that we trap) and arbitrary * Throwables. Because we cannot rethrow Throwables within * Runnable.run, we wrap them within Errors on the way out (to the * thread's UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, whichAlso cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will bein effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; Boolean completedAbruptly = true; GetTask () while (task!) getTask() while (task! = null || (task = getTask()) ! = null) { w.lock(); / acquire/woker lock to prevent thread is another thread interrupts clearInterruptsForTaskRun (); BeforeExecute (w.hash, task); // Clear all interrupt flags try {beforeExecute(w.hash, task); Thrown = null; Thrown = null; thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); }} finally {task = null;}} finally {task = null; // the running task is null w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally {// processWorkerExit(w, completedAbruptly); }}Copy the code

Execute’s Task is “wrapped”, and the thread starts by internally calling the Task’s run method. All the rest of the core focuses on the getTask() method:

/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out  waitingfor a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *
     * @return task, or null if the worker must exit.in which case* workerCount is decreed * * * getting threads in a queue */ private RunnablegetTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if// If the state is >stop, the workQueue is not processed and the number of workers is reduced, so null is returned. If the state is shutdown and the workQueue is empty, the number of workers is reduced and null is returnedif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! = null)return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false; }}}Copy the code

Boolean timedOut = false; Boolean timed; Boolean timed; It mainly indicates whether the current Worker needs to exit due to timeout. When WC > corePoolSize needs to reduce the number of idle workers, timed is true, but when WC <= corePoolSize, the number of core threads cannot be reduced and timed is false. TimedOut is initially false, and if timed is true then poll is used to fetch the thread. If returned normally, the fetched task is returned. If the timeout occurs, the worker is idle and the worker exceeds corePoolSize, which needs to be deleted. Return to r = null. Then timedOut = true. Wc <= maximumPoolSize &&! (timedOut && timed), the number of workers is reduced and null is returned, resulting in worker exit. If the number of threads <= corePoolSize, then workqueue.take () will be called and will block until the thread is fetched or interrupted.

This completes the analysis of thread execution

About terminating thread pools

In my opinion, if you want to understand thread pools, you have to understand transitions between states, and the termination mechanism of thread pools is a good way to understand transitions. There are two main methods for shutting down a thread pool: shutdown() and shutdownNow(): Start with the shutdown() method:

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // Check whether the target thread checkShutdownAccess() can be operated; // Set the state of the thread pool to SHUTDOWN, after which no new Task advanceRunState(SHUTDOWN) is added to the thread pool. // interruptIdleWorkers(); onShutdown(); // hookforScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // go to Terminate tryTerminate(); }Copy the code

Shutdown does several things: 1. Check to see if the target thread can operate. 2. Switch the thread pool state to SHUTDOWN. 3. Interrupt all idle threads

This raises the question, what is an idle thread? This is followed by looking at interruptIdleWorkers.

private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // The intent here is very simple, traversing workers to do interrupt processing for all workers. // w.trylock () locks the Worker, which ensures that the Worker running the Task will not be interrupted. try {for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break; } } finally { mainLock.unlock(); }}Copy the code

The main purpose here is to interrupt the worker, but the lock needs to be obtained before the interruption, which means that the running worker cannot be interrupted. But the above code has w.t. Lock(), so there is no interruption until the lock is acquired, and the shutdown Interrupt simply interrupts all idle workers fetching tasks from the workQueue while the Worker is unlocked.

            while(task ! = null || (task = getTask()) ! = null) { w.lock(); / acquire/woker lock to prevent thread is another thread interrupts clearInterruptsForTaskRun (); BeforeExecute (w.hash, task); // Clear all interrupt flags try {beforeExecute(w.hash, task); Thrown = null; Thrown = null; thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); }} finally {task = null;}} finally {task = null; // the running task is null w.completedTasks++; w.unlock(); }}Copy the code

In runWorker, after each Worker getTask succeeds, the lock of the Worker will be acquired and run, that is to say, the running Worker will not be interrupted. Because core threads typically block fetching tasks when idle, only interruptions can cause them to exit. These blocking workers are idle threads (non-core threads, of course, and also idle threads that block). In the getTask method:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if// If the state is >stop, the workQueue is not processed and the number of workers is reduced, so null is returned. If the state is shutdown and the workQueue is empty, the number of workers is reduced and null is returnedif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for(;;) {//allowCoreThreadTimeOu specifies whether corethreads will time out.trueFor timeout,falseIt doesn't time out. The default isfalse
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! = null)return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false; }}}Copy the code

There are two stages of the Worker:

  • GetTask () is just entered, and state judgment has not been made.
  • Block the Worker on the poll or take.

When the ShutDown method is called, the state of the thread pool is first set as ShutDown. At this time, the worker in stage 1 will return null when entering the state judgment, and the worker will exit. Because getTask is unlocked, worker.interrupt can be called when shutdown occurs. At this point, the workQueue will be interrupted and exit, and the Loop will reach the status judgment, and the workQueue will be empty. Then an interrupt exception is thrown, causing a re-loop and the Worker exits while the thread pool status is checked. If the workQueue is not null, it will not exit. This is a bit of a puzzle because there is no logic to clear the interrupt flag bit, so the loop will continue until the workQueue exits at Empty. SHUTDOWN only clears some idle workers and denies new tasks. Threads in the workQueue continue to process. The main reason for this is that Works is of HashSet type, which is not thread safe. We can also see that addWorker also checks the thread pool state. Separate Worker add and interrupt logic. Next, we do the tryTerminate() operation, which is a transition to the subsequent state, said after shutdownNow. Here’s shutdownNow:

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
Copy the code

ShutdownNow and Shutdown code are similar, but the implementation is quite different. The first step is to set the thread pool state to STOP. As you can see from the previous code, there is some extra logic for SHUTDOWN, but there is almost all reject for >=STOP. STOP is also a stricter state than SHUTDOWN. At this point, no new Worker will join, and all workers who go to GetTask just after executing a thread will quit. InterruptWorkers is then called:

    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for(Worker w : workers) { try { w.thread.interrupt(); } catch (SecurityException ignore) { } } } finally { mainLock.unlock(); }}Copy the code

As you can see, the purpose of this method is to interrupt all workers, not just idle threads as in shutdown. This is a STOP feature that interrupts all threads and stops tasks in the workQueue. So here’s the drainQueue:

   /**
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one.
     */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        List<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if(! q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {
                if(q.remove(r)) taskList.add(r); }}return taskList;
    }
Copy the code

Gets all tasks that are not executed and returns. This also represents the STOP feature: all new tasks are rejected, all threads are interrupted, and all threads in the WorkerQueue that are not executing are discarded. So the Pool is empty and the WorkerQueue is empty. This is followed by the transition to TIDYING and TERMINATED:

    /**
     * Transitions to TERMINATED state ifeither (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action  that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The  method is non-private to * allow access from ScheduledThreadPoolExecutor. */ final voidtryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if(workerCountOf(c) ! = 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE);return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return; } } finally { mainLock.unlock(); } / /else retry on failed CAS
        }
    }
Copy the code

There are several states that cannot be converted to TIDYING:

  1. RUNNING state
  2. TIDYING or TERMINATED
  3. SHUTDOWN state, but workQueue is not empty

The workQueue must be empty and the workerCount must be 0. 2. STOP is converted to TIDYING, requiring workerCount to be 0

If the conditions are TERMINATED (which are TERMINATED after a certain amount of time), CAS becomes TIDYING and TIDYING is an TERMINATED state.

ThreadPoolExecutor (ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor, ThreadPoolExecutor

Make progress together, learn and share

Welcome to pay attention to my public number [calm as code], massive Java related articles, learning materials will be updated in it, sorting out the data will be placed in it.

If you think it’s written well, click a “like” and add a follow! Point attention, do not get lost, continue to update!!