This is the 14th day of my participation in the August More Text Challenge. For details, see:August is more challenging

preface

The previous article introduced the basic principles and usage scenarios of thread pools. This article mainly analyzes thread pools from the source perspective

1. Source code analysis

In HotSpot VM’s threading model, Java threads (java.lang.thread) are mapped one-to-one to native operating system threads. When a Java thread starts, it creates a local operating system thread; When the Java thread terminates, the operating system thread is also reclaimed. The operating system schedules all the threads and assigns them to the available CPU.

Java multithreaded programs typically decompose an application into several tasks and then use user-level schedulers (the Executor framework) to map these tasks to a fixed number of threads.

1.1 Overall framework

The top-level interface Executor void Execute (Runnable Command);

ExecutorServiceExtend the Executor function. There are two ways for a thread pool to submit a task: execute and submit. The two biggest differences are that Submit can passFutureTaskGets the result of thread execution.

AbstractExecutorService implements many of the interfaces of ExecutorService.

The Executor framework consists of three main components

  1. Task: The interface to be implemented by the task: Runnable interface or Callable interface.
  2. Execution of tasks, the core interface Executor, and the ExecutorService interface inherited from Executor. There are two key Executor framework class implements the ExecutorService interface (ThreadPoolExecutor and ScheduledThreadPoolExecutor)
  3. The result of an asynchronous computation, including the Future interface and the FutureTask class that implements the Future interface.

1.2 ThreadPoolExecutor,

The previous article covered the constructor of a ThreadPoolExecutor, the lifecycle state transition of a thread pool, and the core member variable method. So my entrance ThreadPoolExecutor from source. The execute () to analyze

1.2.1 the execute method

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   
    // CTL indicates the thread pool status and number of threads
    int c = ctl.get();
    // 1. Check whether the number of threads is smaller than the number of core threads
    if (workerCountOf(c) < corePoolSize) {
        // Add a task
        if (addWorker(command, true))
            return;
        // If the task fails to be submitted, obtain the value of CTL again
        c = ctl.get();
    }
    // 2. Check whether the thread pool is in the RUNNING state and add tasks to the queue
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        If the thread pool is not in the RUNNING state, remove the queued task and execute the reject policy
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    // 3. Add non-core threads. This will determine whether the current number of threads is greater than the maximum number of threads
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

The main process can be compared to the previous figure, the business process is clear, let’s look at the addWorker method

1.2.2 addWorker method

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // Thread pool status
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 1. Thread pool status: STOP, TIDYING, or TERMINATED
        // 2. In the SHUTDOWN state, new tasks are not received.
        // 3. If the status is SHUTDOWN and firstTask == NULL, but the queue is empty, the current queued task still needs to be executed.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        for (;;) {
            int wc = workerCountOf(c);
            // code indicates whether the current number of threads exceeds the limit
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // If CAS fails, retry
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if(runStateOf(c) ! = rs)continue retry;
            // else CAS failed due to workerCount change; retry inner loop}}// Worker indicates whether the flag bit has been started
    boolean workerStarted = false;
    // Whether the current worker is stored in the HashSet
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Construct a Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // Thread creation succeeded
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            // Get the global lock for the thread pool
            mainLock.lock();
            try {
                
                int rs = runStateOf(ctl.get());
                // If rs
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // Add the thread worker to the workers HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize records the maximum number of threads in the thread pool
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // The id was added successfully
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            // If the worker is added successfully, start the thread to execute the task
            if (workerAdded) {
                t.start();
                workerStarted = true; }}}finally {
        // Failure handling
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

1.2.3 addWorkerFailed method

We’re looking at the failure handling method addWorkerFailed

Three main things to do:

  1. Remove worker from workers
  2. The number of workers is -1
  3. Check the termination

1.2.4 Worker inner class

An inner class in ThreadPoolExecutor :Worker. The threads in the Java thread pool are wrapped as individual workers, representing the Worker threads in the thread pool.

Looking at the class diagram of Woker, we can see that the worker inherits AQS and implements the thread Runable interface

The following methods we are actually familiar with, the AQS implementation class and the thread run method, let’s look at the runWorker method

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // Release the exclusive lock with state set to 0 in AQS
    w.unlock(); // allow interrupts
    // This flag indicates whether the current Worker has stopped due to an exception
    boolean completedAbruptly = true;
    try {
        while(task ! =null|| (task = getTask()) ! =null) {
            // Worker locking means that other methods in the thread pool may interrupt the Worker
            // In order to ensure that the Worker completes the task safely, the Worker must be broken when the lock is acquired
            TryTerminate (), shutdown(), etc.
            w.lock();
            // Ensure that the thread is interrupted. Otherwise, ensure that the thread is not interrupted
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                // The hook function is implemented by subclass
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // Subclass to implement the hook functionafterExecute(task, thrown); }}finally {
                // Set task to NULL and prepare to accept the next task
                task = null;
                // This worker has completed +1 tasks
                w.completedTasks++;
                // Release the exclusive lock
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // There are no tasks in the queue, or an exception occurred during the task executionprocessWorkerExit(w, completedAbruptly); }}Copy the code

1.2.5 getTask method

How do I fetch a task from a task queue?

This method is to retrieve the task from the queue and return NULL to indicate that the thread needs to be closed. There are three possibilities:

  1. Block the acquisition task until the acquisition succeeds
  2. If the number of threads is greater than corePoolSize, the thread must be closed. If the number of threads is greater than corePoolSize, the thread must be closed.
  3. If null is returned, the thread needs to be closed
    1. The number of workers in the thread pool is greater than maximumPoolSize(set by calling setMaximumPoolSize)
    2. When the thread pool is in the STOP state, tasks in the task queue cannot be executed
    3. The thread pool is in SHUTDOWN state, but the task queue is empty
private Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // CAS workerCount -1
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // If the pre-validation is passed, fetch the task
        try {
            // If timed is true, call the poll method with timeout, otherwise execute the take method to block the acquisition task.
            // The core thread needs to be recycled
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

1.2.6 processWorkerExit method

Method in worker.runworker () to clean up the Worker

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Number of completed threads in the thread pool +1
        completedTaskCount += w.completedTasks;
        // Remove worker from thread pool
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // Handle the TERMINATED state
    tryTerminate();

    int c = ctl.get();
    // If the thread pool status is RUNNING or SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        // This method is not entered because an exception occurred during the execution of the task
        if(! completedAbruptly) {// Whether to allow retracting core threads
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0&&! workQueue.isEmpty()) min =1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // If the worker is terminated due to an exception, or if the thread pool still has a task to execute
        // But there are no threads left. At this point, call addWorker to create a new thread
        addWorker(null.false); }}Copy the code

1.2.7 tryTerminate method

This method is used to handle the TERMINATED thread pool state. After executing terminated(), it will be terminated by TIDYING -> terminated.

final void tryTerminate(a) {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // If the number of workers is not 0, the thread pool cannot be stopped
        if(workerCountOf(c) ! =0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // The CAS tries to change the thread state to TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // hook method
                    terminated();
                } finally {
                    // After executing terminated(), the thread pool status is finally set to terminated
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return; }}finally {
            mainLock.unlock();
        }
        // else retry on failed CAS}}Copy the code

1.2.8 interruptIdleWorkers method

1.2.9 shutdown method

Close the thread pool. This method interrupts threads that are not executing tasks

public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // Shutdown status ->shutdown
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
Copy the code

1.2.10 shutdownnow method

public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        Thread pool status ->STOP
        advanceRunState(STOP);
        // Interrupt all workers threads directly
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
Copy the code

The key difference between shutdown and shutdowNow in the code is the operation of the interrupt thread

Reference documentation

JAVA thread pool source depth parsing JAVA thread pool architecture (a) principle and source code parsing “concurrent programming art”