“This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”

A thread pool that cannot be wound

Just looking at the English semantics of ThreadPoolExecutor, you can see that this is a class related to thread pools.

Anyone who has ever done development knows about thread pools and can tell you more or less about them. In spite of this, the author would like to take the trouble to give you a deeper memory 😀😀

Thread pooling is a pooling technique. There are many similar pooling techniques in Java. The common ones are:

  • Database connection Pool
  • Redis connection pool
  • The HTTP connection pool
  • Memory pool
  • The thread pool

The role of pooling technology: some reusable things (such as connections, threads) into an initialized pool, facilitate unified resource management. Benefits:

  1. Avoids the overhead of repeated creation, destruction, and scheduling, and improves performance
  2. Ensure full utilization of the kernel and prevent excessive scheduling
  3. Customize parameters to achieve the best effect

ThreadPoolExecutor knowledge

Method for creating a thread pool in Java

Is not recommended

Create the following thread pool using the static method of the Executors class

  • FixedThreadPool
  • ScheduledThreadPool (Perform periodic tasks)
  • WorkStealingPool (generate the number of threads based on the number of cpus in the current computer)
  • CachedThreadPool (with caching)
  • SingleThreadPool (single thread)

recommended

Create a thread pool using ThreadPoolExecutor

    // Give the thread a name with business meaning
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%s").build();
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            5.// Number of core threads in the thread pool
            10.// The maximum number of threads in the thread pool. No more threads will be added to the thread pool after the maximum number is reached
            1000.// Maximum lifetime of idle threads in a thread pool that exceeds the number of corePoolSize
            TimeUnit.MILLISECONDS,  // The unit of time is milliseconds
            new LinkedBlockingQueue<>(50),  // The worker thread waits for the queue
            threadFactory,  // Custom thread factory
            new ThreadPoolExecutor.AbortPolicy());  // Reject policy when thread pool is full
Copy the code

why

Let’s take a look at alibaba’s “Java Development Manual” in what?

Let’s see how to write the source code?

The SingleThreadPool implementation is also a thread pool created using the ThreadPoolExecutor constructor. The reason it is not recommended is that it uses a LinkedBlockingQueue as a wait queue for worker threads, which is an unbounded buffer wait queue. The default constructor for this queue defines a length of integer.max_value

FixedThreadPool similarly

CachedThreadPool uses SynchronousQueue, an unbounded unbuffered wait queue, and its maximum number of threads is integer.max_value

ScheduledThreadPool is an unbounded blocking queue that uses DelayedWorkQueue. The maximum number of threads is integer.max_value

In contrast, with ThreadPoolExecutor, we can specify bounded/unbounded blocking queues and specify an initial length.

ThreadPoolExecutor source analysis

Thread pool life cycle

Graph LR RUNNING -- "shutdown()" --> shutdown -- "block queue empty, ShutdownNow () --> STOP -- "terminated TIDYING TIDYING -- "terminated() --> terminated TIDYING TIDYING -- "terminated()" --> TERMINATED
Thread pool status State definition
RUNNING The initial state of the thread pool after it has been created. It can accept new submitted tasks and also process tasks in the blocking queue
SHUTDOWN Closed state, no longer accepting new submitted tasks, but can continue processing tasks that have entered the blocking queue
STOP Interrupts a thread that is processing a task, can no longer accept new tasks, and does not continue processing tasks in the queue
TIDYING All tasks have been terminated and the workerCount(valid number of worker threads) is 0
TERMINATED The thread pool terminates completely

Tips: Never confuse the thread pool state with the thread state. Make up a thread state diagram on the web

Tips: When a thread calls start(), it may not execute immediately in the JVM. It may wait until the operating system allocates resources, when it is in the READY state, and when it gets resources, when it enters the RUNNING state, it will actually execute.

Rejection policies

  • CallerRunsPolicy (executed in the current thread)

  • AbortPolicy (direct selling RejectedExecutionException)

  • DiscardPolicy (discards threads directly)

  • DiscardOldestPolicy (discard a thread that has not been processed for the longest time and retry)

AbortPolicy is used by default when no specified rejection policy is displayed

ThreadPoolExecutor class diagram

IDEA Diagrams show UML class Diagrams, and the inheritance relationship is clear

The ThreadPoolExecutor class has many methods. The core of the ThreadPoolExecutor class is a method to construct a thread pool and a method to execute a thread task.

Task execution mechanism

  • By performingexecutemethods

This method, which returns no value, is a method of ThreadPoolExecutor and is passed an object of type Runnable

  • By performingsubmitmethods

This method returns a Future object. It is an AbstractExecutorService method inherited from ThreadPoolExecutor. Its internal implementation also calls the execute method of the interface Executor class. The implementation of this method is still the Execute method of ThreadPoolExecutor

Execute () Execute the flowchart

Execute () source code interpretation

    // Use the CTL variable of AtomicInteger. The first three bits record the status of the thread pool, and the last 29 bits record the number of threads
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // The range of Integer is [-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29, which is used to assist the left shift operation
    private static final int COUNT_BITS = Integer.SIZE - 3;
    Three bits are used to store the thread pool's running state, and the remaining bits represent the pool's capacity
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // The thread pool state is stored as a constant value in three bits
    private static final int RUNNING    = -1 << COUNT_BITS; // The thread pool accepts new tasks and processes tasks in the blocking queue
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // The thread pool does not accept new tasks, but processes tasks in the blocking queue
    private static final int STOP       =  1 << COUNT_BITS; // The thread pool does not accept new tasks, does not process tasks in the blocking queue, and interrupts ongoing tasks
    private static final int TIDYING    =  2 << COUNT_BITS; // If all tasks are completed and the number of worker threads is zero, the terminated method will be called
    private static final int TERMINATED =  3 << COUNT_BITS; // The final state is the state after executing the terminated() method

    // CTL variables related to the sealing and unpacking methods
    private static int runStateOf(int c)     { return c & ~CAPACITY; } // Get the thread pool running status
    private static int workerCountOf(int c)  { return c & CAPACITY; } // Get the number of threads running in the thread pool
    private static int ctlOf(int rs, int wc) { return rs | wc; } // Obtain the CTL object
Copy the code
public void execute(Runnable command) {
    if (command == null) // If the task is empty, the NPE is thrown
        throw new NullPointerException();
        
    int c = ctl.get(); // Get the current number of worker threads and the thread pool running status (32 bits in total, the first 3 bits are running status, the last 29 bits are running threads)
    if (workerCountOf(c) < corePoolSize) { // If the current number of worker threads is less than the number of core threads
        if (addWorker(command, true)) // Create a worker thread in addWorker and execute the task
            return;
        c = ctl.get();
    }
    
    // The number of core threads is full (number of worker threads > number of core threads)
    if (isRunning(c) && workQueue.offer(command)) { // If the current thread pool status is RUNNING and the task was successfully added to the blocking queue
        int recheck = ctl.get(); // Double check, because the thread pool may have been in SHUTDOWN state since the last check
        if (! isRunning(recheck) && remove(command)) // Delete the task from the queue if the current thread pool status is not RUNNING
            reject(command); // Execute the reject policy
        else if (workerCountOf(recheck) == 0) // If the workerCount in the thread pool is 0, add an addWorker to consume the work in the workQueue
            addWorker(null.false);
    }
    // The block queue is full
    else if(! addWorker(command,false)) // Try adding a worker thread to execute command
        // If the thread pool is in SHUTDOWN state or is saturated
        reject(command); // Execute the reject policy
}
Copy the code
private boolean addWorker(Runnable firstTask, boolean core) {
    retry: // Loop exit flag bit
    for (;;) { // Infinite loop
        int c = ctl.get();
        int rs = runStateOf(c); // Thread pool status

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && 
            ! (rs == SHUTDOWN && firstTask == null&&! workQueue.isEmpty())// Use a more intuitive conditional statement
            // (rs ! = SHUTDOWN || firstTask ! = null || workQueue.isEmpty())
           )
           // Return false;
           // The thread pool is in STOP, TIDYING, and TERMINATED state
           // (2) The thread pool status is SHUTDOWN, and the task to be executed is not empty
           // (3) The thread pool status is SHUTDOWN and the task queue is empty
            return false;

        // Cas spin to increase the number of threads
        for (;;) {
            int wc = workerCountOf(c); // The current number of worker threads
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) / / worker threads number > = thread pool capacity | | worker threads number > = (core number of threads | | maximum number of threads)
                return false;
            if (compareAndIncrementWorkerCount(c)) // Perform the CAS operation to add the number of threads
                break retry; // Add successfully, exit the outer loop
            // The CAS fails to be added
            c = ctl.get();  
            If the thread pool status changes, it will jump to the outer loop and retry to obtain the thread pool status again. If no, the inner loop will retry the CAS
            if(runStateOf(c) ! = rs)continue retry;
            // else CAS failed due to workerCount change; retry inner loop}}// To summarize the above CAS process:
    If the number of threads exceeds the limit, return false. If the number of threads exceeds the limit, return false
    // (2) If the CAS is successful, the double loop exits. If the CAS is not successful, the CAS fails. It depends on whether the current thread pool status has changed
    // (3) If yes, enter the outer loop to obtain the thread pool state again; if no, enter the inner loop again to continue cas

    // The number of threads is +1, but the cas is not executed
    boolean workerStarted = false; // The worker thread calls the start() method flag
    boolean workerAdded = false; // The worker thread is tagged
    Worker w = null;
    try {
        w = new Worker(firstTask); // Create a worker thread instance
        final Thread t = w.thread; // Get the thread instance held by the worker thread
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock; // Use globally reentrant locks
            mainLock.lock(); // Lock to control concurrency
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get()); // Get the current thread pool status

                // Thread pool status is RUNNING or (thread pool status is SHUTDOWN and there are no new tasks)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // Check whether the thread is active
                        throw new IllegalThreadStateException();
                    workers.add(w); // The thread is added to the HashSet that holds the worker thread. Workers are globally unique and are held by the mainLock
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock(); // Release the lock in the finally block
            }
            if (workerAdded) { // Thread added successfully
                t.start(); // Call the thread's start() method
                workerStarted = true; }}}finally {
        if (! workerStarted) // If the thread fails to start, the addWorkerFailed method is executed
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if(w ! =null)
            workers.remove(w); // If a thread fails to start, delete the previously added thread
        decrementWorkerCount(); // The number of worker threads in the CTL variable is -1
        tryTerminate(); // Try to transition the thread pool to TERMINATE state
    } finally{ mainLock.unlock(); }}Copy the code
final void tryTerminate(a) {
    for (;;) {
        int c = ctl.get();
        // The following cases will not enter the TERMINATED state:
        // (1) The thread pool is in the RUNNING state
        // (2) in the state of TIDYING or above
        // (3) SHUTDOWN and the work queue is not empty
        // (4) The number of active threads is not equal to 0
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if(workerCountOf(c) ! =0) { // The number of worker threads! = 0
            interruptIdleWorkers(ONLY_ONE); // Interrupt a thread that is waiting for a task
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Use CAS spin to determine until the current thread pool is in TIDYING state and the number of active threads is 0
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); // call thread terminated()
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0)); // Set the thread pool status to TERMINATED and the number of worker threads to 0
                    termination.signalAll(); // Wake up all waiting threads by calling signalAll() of the Condition interface
                }
                return; }}finally {
            mainLock.unlock();
        }
        // else retry on failed CAS}}Copy the code

Worker source code interpretation

The Worker is an inner class of the ThreadPoolExecutor class, and only the most important constructors and run methods are covered here

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    // The thread that the worker is running on
    final Thread thread;
    
    // The initial task to run
    Runnable firstTask;
    
    // Task counters for each thread
    volatile long completedTasks;

    // The constructor
    Worker(Runnable firstTask) {
        setState(-1); // Disable interrupts before calling runWorker()
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); // Create a thread using ThreadFactory
    }

    // Implement the Runnable interface's run method
    public void run(a) {
        runWorker(this); }...// Other methods are omitted
}
Copy the code

The Worker implements the Runable interface. When the start() method is called, the run method is actually executed

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // Get the thread instance in the worker thread used to execute the task
    w.firstTask = null;
    w.unlock(); // Set status to 0 to allow interrupts
    boolean completedAbruptly = true; // Unexpected thread termination flag
    try {
        // If the current task is not empty, the task is executed directly. Otherwise call getTask() to fetch a task from the task queue for execution
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock(); // Lock to ensure thread safety in the critical section below
            // If the status value is greater than or equal to STOP and the current thread has not been interrupted, interrupt the thread actively
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();// Interrupt the current thread
            try {
                beforeExecute(wt, task); // The callback before the task execution, empty implementation, can be customized in the subclass
                Throwable thrown = null;
                try {
                    task.run(); // Execute the thread's run method
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // Callback after task execution, empty implementation, can be customized in the subclass}}finally {
                task = null; // Set the loop variable task to null to indicate that processing is complete
                w.completedTasks++; // The current number of completed tasks +1
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

Gets a task from the task queue

private Runnable getTask(a) {
    boolean timedOut = false; // The timeOut variable indicates whether the idle time of the thread has expired
    // Infinite loop
    for (;;) {
        int c = ctl.get(); // Thread pool information
        int rs = runStateOf(c); // Current status of thread pool

        // If the thread pool status is >=SHUTDOWN and the task column is empty or the thread pool status is >=STOP, null is returned and the current worker is destroyed
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount(); // The number of worker threads is -1
            return null;
        }

        int wc = workerCountOf(c); // Get the number of worker threads in the current thread pool

        // Indicates whether the current thread allows timeout destruction
        // Allow timeout destruction: when the thread pool allows core threads to time out or the number of worker threads > the number of core threads
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // If (the current number of threads is greater than the maximum number of threads or (timeout destruction is allowed and idle time timeout occurs))
        // If the number of threads is greater than 1 or the blocking queue is empty
        // Reduces the worker count and returns NULL
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // Take a task from the head of the task queue using poll or take (which blocks), depending on whether the thread allows a timeout
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r; // Return the task taken from the queue
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

To summarize the cases where getTask() returns null:

  1. The thread pool status is SHUTDOWN and the task queue is empty
  2. The thread pool status is STOP, TIDYING, or TERMINATED
  3. The number of threads in the thread pool exceeds the maximum number of threads
  4. Threads can be recycled by timeout while waiting for a new task to timeout

Worker thread exit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // If completedAbruptly is true, an unhandled exception is thrown during task execution
    // So the worker count has not been properly reduced, so you need to reduce the worker count once
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Add the number of completed tasks from the threads to be destroyed to the total number of completed tasks in the thread pool
        completedTaskCount += w.completedTasks;
        workers.remove(w); // Remove the worker thread from the worker thread collection
    } finally {
        mainLock.unlock();
    }

    // Try to terminate the thread pool
    tryTerminate();

    int c = ctl.get();
    // In the RUNNING or SHUTDOWN state
    if (runStateLessThan(c, STOP)) {
        // The worker is executing normally
        if(! completedAbruptly) {// The minimum number of threads is 0 if the core thread timeout is allowed, otherwise the minimum number of threads is equal to the number of core threads
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // If the blocking queue is not empty, at least one thread must continue with the remaining tasks
            if (min == 0&&! workQueue.isEmpty()) min =1;
            // If the current number of threads meets the minimum number of threads, there is no need to create an alternative thread
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // Create a new worker to replace the destroyed thread
        addWorker(null.false); }}Copy the code

The resources

The realization principle of Java thread pool and its practice in Meituan business