preface

Lecture 4 in the Thread pool series. It’s the most important lecture. Thread pool workflow. This article is also part of the source code analysis. Enter the source code with questions to learn about thread pools.

series

  • Thread Pool Series – (1) Background
  • Thread pool series – (2) Thread pool status
  • Thread pool series – (3) Rejection policy
  • Thread Pool series – (4) Workflow
  • Thread pool series – (5) Shutdown && shutdownNow
  • Thread pool – (6) Submit

The flow chart

Let’s start with a thread pool flowchart. Have an overall impression. Analysis in combination with source code.

Is the thread pool running?

We know that the most important and core piece of thread pool code is the execute method. See 👇 below.

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
        The number of threads is smaller than the number of core threads
        / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
        // Get a variable that holds the state and number of threads in the thread pool
        int c = ctl.get();
        // The number of threads is smaller than the core thread
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
        // Add to queue
        / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
        // Make sure the thread pool is in execution
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // If not in the running state. Delete the tasks in the degree column
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null.false);
        }
        / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
        // Continue adding tasks
        / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
        else if(! addWorker(command,false))
            reject(command);
    }
Copy the code

Look from the source code. Isn’t the first step to determine if it is smaller than the number of core threads? Why judge the thread pool state?

Well, that’s one way to think about it. But a more accurate understanding; The thread pool is still running

We see inside addWorker how the thread pool state is set to unusable at the beginning. That’s shutdownNow or shutdown; As you can see from the breakpoint, when executing execute after shutdownNow or shutdown, the value of runStateOf is 1610612736

addWorker 1610612736

The state is TERMINATED by the first three bits. The state is TERMINATED and false is returned. With the following 👇 diagram, let’s order the logic. First of all, it’s executed

  • â‘  Less than the number of core threads.

  • â‘¡ It cannot be added.

  • â‘¢ Execution End

  • â‘£ Check whether the thread pool is running. It wasn’t. Don’t go here

  • ⑤ Add again. Failed to add

  • â‘¥ Go to the thread pool rejection policy

So, the initial check is to see if the thread pool is running. If not, a rejection policy is implemented. That answers the top question. Why is the thread pool state determined first in the process

The author prepared a flowchart of the appeal. As follows, it is convenient for you to know the process clearly.

This problem is sorted out. Let’s take a look at the process at 👆 at the top and see if I can get a better understanding of the thread pool process

Add tasks

As we analyzed earlier. How does addWorker add a task when it is smaller than the number of core threads?

Take a look at his source code first, actually still many. The author divides it into two parts for analysis.

  • The for loop
  • try cache

Top half for

Let’s look at the for loop.

  • We have analyzed it in â‘ . Not triggered when the thread pool is running.

  • â‘¡ First see. There are two for(;;) in the screenshot. Cycle. Second loop (red box â‘¡)

    • Check whether the number of threads exceeds the upper limit.CAPACITYThe value of 👇 is as follows

    • Or whether it exceeds the core thread or the maximum thread. Here we choose the number of core threads. As follows 👇 we pass true

    • And then execute itcompareAndIncrementWorkerCountTry updating the previously recorded number of threads.
      • True indicates that the CAS record succeeded and breaks out of the big loop, the outermost loop.
      • False Indicates that the record is not successful. Continue executing the for loop until it returns true

The top half actually updates the number of threads in the thread pool. Tasks are not actually added

The lower part of the try cache

Move on to the second half. Try cache part.

Here are five steps.

  • The first step (1). The task is wrapped in the Worker.

  • The first step (2). locked

  • The first step (3). Add tasks

  • The first step (4). unlock

  • The first step (5). Perform a task

In step 1. The task is wrapped in the Worker. Worker is the packaging of task and thread. As shown in the figure below, a new thread is created through the thread creation factory to combine the task and thread together.

Take a look at what Worker inherits from. Notice that there is a Runnable

Take a look at step 3, add a task

try {
    // Check thread pool status
    int rs = runStateOf(ctl.get());
    // if the thread pool is SHUTDOWN and the task is null
    if (rs < SHUTDOWN ||
        (rs == SHUTDOWN && firstTask == null)) {
        // Check whether the thread is alive
        if (t.isAlive()) // precheck that t is startable
            // Throw an exception
            throw new IllegalThreadStateException();
       // Add tasks to workers (HashSet)workers.add(w); }}Copy the code

Let’s look at his two judgments

  • The thread pool is running

  • The thread pool is SHUTDOWN and firstTask == null

The first one is easier to understand. The key is the second condition. The author consulted the source code. AddWorker (null,true/flase) is found in many places. These methods should be judged.

Let’s see if the thread is alive.

First, isAlive() is true to indicate that the thread is executing. In other words. One of the preceding threads is in RUNNABLE TIMED_WAITING BLOCKED WAITING. Either way. Both indicate that the thread is now working.

Thread pool. The task you just created has already been executed. We haven’t even started yet. So we have a precheck that t is startable

Perform a task

This is a little bit easier to understand. Add all workers to workers (HashSet)

That’s not the point. Focus on 👇t.start().

We all know that a thread’s start() must have a place where run is executed. The thread pool hides this in the Worker. Take a look at the Worker. Runnable is implemented. And implements the run method. When the thread starts (). I’m going to run up here, and then I’m going to run worker (this);

At this point we know that the place where the task is being executed is at runWorker. The core code here is just one line task.run(); Execute the task.

final void runWorker(Worker w) {
    // Get the current thread
    Thread wt = Thread.currentThread();
    // Get the current task
    Runnable task = w.firstTask;
    // Empty worker.firstTask and release the lock
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // If task or getTask is not empty, the loop continues
        while(task ! =null|| (task = getTask()) ! =null) {
            / / lock
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // return ctl.get() >= stop
            // If thread pool status >=STOP or (thread is interrupted and thread pool status >=STOP) and the current thread is not interrupted
            // Make sure there are two things:
            // 1. The thread pool is not stopped
            // 2. Ensure that the thread is not interrupted
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted())// Interrupt the current thread
                wt.interrupt();
            try {
                / / short method
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 👇 executes the run method (Runable object)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally{ afterExecute(task, thrown); }}finally {
                // When the task is finished, empty it, complete the task ++, and release the lock
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // Exit work
        processWorkerExit(w, completedAbruptly);
    }
Copy the code

See while (task! = null || (task = getTask()) ! = null) this code. There are two conditions

  • The task is not empty
  • GetTask () gets the task

Again, focus on the getTask() method. It’s just getting the task from the queue.

 private Runnable getTask(a) {
        // Flag whether a timeout occurs
        boolean timedOut = false; // Did the last poll() time out?
        / / death cycle
        for (;;) {
            // Get the thread pool state
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // Check whether the thread pool state is greater than or equal to SHUTDOWN
            // Check whether the thread pool state is greater than or equal to STOP or whether the task queue is empty
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                / / degressive workerCount
                decrementWorkerCount();
                // Return null, indicating that there are no more tasks
                return null;
            }

            / / get workerCount
            int wc = workerCountOf(c);

            // Determine whether to use timeout to obtain the task
            AllowCoreThreadTimeOut: this can be set with Thread#allowCoreThreadTimeOut
            // 2. If the workerCount is greater than coolPoolSize, the timeout obtaining task is performed
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // If workerCount is greater than maximumPoolSize or the fetch task times out
            // And workerCount > 1 or the task queue is empty
            //
            // If the workerCount is greater than maximumPoolSize, it is impossible to determine whether the timeout occurs
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // Determine whether the workerCount is successfully decrement
                // Return null on success to indicate no more tasks, failure to continue the loop
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // Determine whether to use timeout to get the task from the task queue
                // the poll method returns null if the task is not fetched at the specified time
                The // take method blocks until the task is fetched
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // If the obtained task is not null, return the task
                if(r ! =null)
                    return r;
                // Mark timeout
                timedOut = true;
            } catch (InterruptedException retry) {
                // Reset the timeout flag if an exception occurs
                timedOut = false; }}}Copy the code