1. Introduction

Before introducing thread pools, let’s know a few things:

  • The creation and destruction of threads comes at a cost, such as the time and associated computing resources required for thread creation. If you create a thread on the Web server for each incoming request, most requests are lightweight processes. The cost of creating a thread is very high compared to the cost of processing the request, which affects overall performance.
  • When the number of threads is enough to make the CPU busy and green, the threads are basically idle when they are created. At this time, the extra threads not only occupy memory, but also may cause other performance costs because they compete with other threads for CPU resources.
  • There is a limit on the number of threads that can be created, and if this limit is exceeded, it may be thrownOutOfMemoryErrorThe exception.

What if there was something that managed the thread lifecycle, reused existing threads, and decoupled the submission and execution of tasks in a simple way. That’s right, Thread pools. To understand Thread pools in Java, you must first understand the ThreadPoolExecutor class.

2. ThreadPoolExecutor explanation

Class diagram

The constructor

Thread pool configuration information,volatileModifier ensures the visibility of variables across multiple threadsprivate volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

private static final RuntimePermission shutdownPerm =
    new RuntimePermission("modifyThread");

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
            
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
            
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Copy the code

The constructor of the ThreadPoolExecutor class in the JDK contains seven parameters.

parameter meaning
corePoolSize The base size is the number of core threads in the thread pool
maximumPoolSize The maximum size is the maximum number of threads allowed in the thread pool
keepAliveTime The elapsed time, when a thread is not executing a task, is marked as recyclable until the thread pool size exceeds the base size and the marked thread is terminated
unit keepAliveTimeOf the units, yesDAYS,HOURS,MINUTES,SECONDS,MILLISECONDS,MICROSECONDS,NANOSECONDS7 units optional
workQueue Work queue, a blocking queue that holds tasks waiting to be executed
threadFactory Thread factories. The thread pool creates a thread by calling the thread factoryThread newThread(Runnable r)To create a thread
handler Saturation strategy. When the blocking queue is full, the current thread pool has reached its maximum number of threads, and no threads are idle, the saturation policy is applied to the submitted tasks. (Saturation is also performed if a task is submitted to a closed Executor.)

The ThreadPoolExecutor class has four overloaded constructors. Each constructor must specify the first five arguments in the table above. The last two arguments can be specified at will.

Thread Factories

Thread pool threads are created using ThreadFactory’s Thread newThread(Runnable R) method. Below is the source code for the default thread factory method in Executors class.

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix ="pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);returnt; }}Copy the code

As you can see from the above, the default Thread factory creates a non-daemon Thread with the priority of thread. NORM_PRIORITY. If you want to customize your own Thread factory to meet your needs, simply implement the ThreadFactory interface’s Thread newThread(Runnable R) method.

Saturation Policy (RejectedExecutionHandler)

The ThreadPoolExecutor class in the JDK provides four different RejectedExecutionHandler implementations:

  • AbortPolicy Default saturation policy, which throws unchecked (run-time exception)RejectedExecutionException.
  • DiscardPolicyThe task is discarded without performing any operation
  • CallerRunsPolicyThe task is performed in the caller thread
  • DiscardOldestPolicyDiscards the first task in the blocking queue and reschews it to the thread pool

Similarly, you can customize the saturation policy by implementing the RejectedExecutionHandler interface.

Thread pool status and number of threads

/ an atomic variable that represents the current state and number of threads in the thread poolprivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; / COUNT_BITS for29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; /CAPACITY indicates the maximum number of threads. / Thread pool statusprivate static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3<< COUNT_BITS; / functions that pack and unpack thread pool state and thread count:private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { returnrs | wc; } / Three functions to determine the state of the thread poolprivate static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    returnc < SHUTDOWN; } / The number of threads increases1, returns successfullytrue, return on failurefalse
private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1); } / Number of threads minus1, returns successfullytrue, return on failurefalse
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1); } / Number of threads minus1If it fails, retry until it succeedsprivate void decrementWorkerCount(a) {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

Copy the code

The AtomicInteger variable CTL uses three bits higher to represent the current thread pool state and 29 bits lower to represent the current number of threads.

Java thread pools are in five different states: RUNNING, SHUTDOWN, STOP, TIDYING, and TERMINATED. ThreadPoolExecutor is represented by five integer constants, each of which has its state represented by the higher three bits:

  • RUNNINGHigh three111The thread pool in this state receives new tasks and processes tasks in the blocking queue
  • SHUTDOWNHigh three000, the thread pool in this state does not receive new tasks, but processes tasks in the blocking queue. callvoid shutdown()Method implementation
  • STOPHigh three001, the thread does not receive new tasks, does not process tasks in the blocking queue, and interrupts running tasks. callList<Runnable> shutdownNow()The implementation.
  • TIDYINGHigh three010, when the thread pool is closed after the task blocking the queue is completed or the thread pool is stopped, thenworkerCount(current thread count) is 0 and is called when the thread pool enters that stateterminated()Methods into theTERMINATEDState.
  • TERMINATEDHigh three011

Start the thread pool

When a ThreadPoolExecutor object is created, there are no threads in the thread pool. Void execute(Runnable Command) is usually used to create a thread and start it, but a core thread can be pre-created and started (in addWorker) by calling the following method:

public int prestartAllCoreThreads(a) {
        int n = 0;
        while (addWorker(null.true))
            ++n;
        return n;
    }
Copy the code

Implementation process

void execute(Runnable command)

  1. Check whether the current thread pool thread number is smaller than the core thread pool size, if yes, create a thread and start, otherwise go to Step 2
  2. Check whether the task queue is full. If not, add the task to the blocking queue. If the task queue is full, go to step 3
  3. Check whether the number of threads in the current thread pool is smaller than the maximum thread pool size. If yes, create a thread and start it; otherwise, execute the saturation policy
public void execute(Runnable command) {/ Task is empty, throws a null pointer exceptionif (command == null)
        throw new NullPointerException();
   
    intc = ctl.get(); / Check whether the number of current threads is smaller than the number of core threadsif(workerCountOf(c) < corePoolSize) {trueTo the thread pool and start the thread to execute the task (as in the addWorker method).if (addWorker(command, true))   
            return; C = ctl.get(); } / Add the task to the blocking queue if the thread pool is runningif (isRunning(c) && workQueue.offer(command)) { 
        intrecheck = ctl.get(); / Check whether the thread pool is running, not remove the task just addedif(! IsRunning (recheck) && remove(command))/Saturate (reject(command)); / Otherwise, if the task is running or fails to remove, the task is in the blocking queue in either case0
        else if (workerCountOf(recheck) == 0If so, add a thread and start addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}   
Copy the code

AddWorker method

The Boolean addWorker(Runnable firstTask, Boolean core) method creates a Worker object and starts the Thread in the object (a Thread field in the Worker).

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
private int largestPoolSize;

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        intrs = runStateOf(c); If the thread pool is not running, you should not theoretically add a thread to perform the task, but you can if the following three conditions are met:1The thread pool state is closed2. The task to be performed is empty3A thread whose firstTask is empty can be added to help complete the task in the blocking queueif (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        for (;;) {
            intwc = workerCountOf(c); / Returns if the number of threads in the current thread pool reaches the maximum number allowed by the thread pool or the number of threads specified to add the thread typefalse
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false; / So far all the previous restrictions have passed, now try to increase the number of threads by one, if successful exit the outermost loopif (compareAndIncrementWorkerCount(c))
                breakretry; C = ctl.get(); c = ctl.get();if(runStateOf(c) ! = rs)continueretry; }}boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try{/ Construct a Worker object, with each Worker object bound to a thread w =new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                intrs = runStateOf(ctl.get()); / If the thread pool is running or closed and firstTask isnull
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {if the thread is started early, an exception is thrownif (t.isAlive()) 
                        throw newIllegalThreadStateException(); / Add w to worker. add(w); Get the size of the Worker set. If the size is larger than largestPoolSize, update itint s = workers.size();
                    if(s > largestPoolSize) largestPoolSize = s; / Succeeded workerAdded =true; }}finally{ mainLock.unlock(); } / If the thread is added successfully, the thread is startedif (workerAdded) {
                t.start();
                workerStarted = true; }}}finally{/ If startup fails (the T thread is empty or an exception was thrown during addition), the addWorkerFailed method is executedif (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

The Worker class

Thread pool maintenance thread is actually a group of Worker object, the Worker encapsulates the thread also inherited AbstractQueuedSynchronizer class and implements the Runnable interface, rewrite the void the run () method. As to why to inherit AbstractQueuedSynchronizer classes, please see the following runWorker method.

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    private static final long serialVersionUID = 6138294804551838833L;
    finalThread thread; Runnable firstTask; / Binds the number of tasks that this object thread has completedvolatile longcompletedTasks; Worker(Runnable firstTask) {/ prevents interrupts, does not allow interrupts until the task is acquired setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); } / The method executed when the thread startspublic void run(a) {
        runWorker(this);
    }
    
    protected boolean isHeldExclusively(a) {
        returngetState() ! =0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0.1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true; } / Get lock, not reentrantpublic void lock(a)        { acquire(1); } / Try to get the lockpublic boolean tryLock(a)  { return tryAcquire(1); } / Release the lockpublic void unlock(a)      { release(1); } / Determine whether the lock is exclusivepublic boolean isLocked(a) { returnisHeldExclusively(); } / Interrupt the thread that has started execution. This is why setState(-1This method is called by the 'shutdownNow()' method.void interruptIfStarted(a) {
        Thread t;
        if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
Copy the code

RunWorker method

The above said why the Worker class to inherit AbstractQueuedSynchronizer, is actually want to use the state of the lock to distinguish the idle thread and the idle thread, in performing runWorker approach:

  • Fetching tasks without locking (idle state, interruptible thread)
  • Lock only when the task is to be executed (thread interruption is not allowed)

Idle threads are interrupted when void tryTerminate() and void shutdown() are called, so threads that are not executing may be interrupted.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null; w.unlock(); / allows interrupts, with setState(-) of the Worker constructor1) It's a pairboolean completedAbruptly = true;
    try{/ Get the task and enter the loopwhile(task ! =null|| (task = getTask()) ! =null) {/ lock, indicating non-idle state w.lock(); /1. If the thread pool state is greater than or equal to STOP and the thread is not interrupted, the interrupt method should be executed2Or run thread.interrupted () to check whether the Thread is interrupted and clear its status. If the status of the Thread pool is greater than or equal to STOP, the interrupt method is used.if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try{/ThreadPoolExecutor beforeExecute(wt, task) method an empty method, Left to classes that inherit ThreadPoolExecutor to override this method and perform beforeExecute(wt, task) before task execution; Throwable thrown =null;
                try{/ Execute the task task.run(); }catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally{/ThreadPoolExecutor's afterExecute(task,thrown) method is also an empty method, It is left to classes that inherit ThreadPoolExecutor to override this method and afterExecute(task, Thrown) after the task is executed; }}finally {
                task = null; / The number of tasks performed by this thread plus1W.com pletedTasks++; W.nlock (); / release the lock. }} / This step indicates that the user exits normally because the task cannot be obtained, so completedAbruptly isfalse
        completedAbruptly = false;
    } finally{/ Execute processWorkerExit(w, completedAbruptly) anyway; }}Copy the code

GetTask method

private Runnable getTask(a) {/ indicates whether the obtaining task has timed outboolean timedOut = false; 
    
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        /1If the thread pool state is greater than or equal to the stopped state, the thread pool will no longer process queued tasks and will reclaim all threads (whether empty or idle), so the number of threads in the thread pool should be reduced1, and the obtained task is empty /2. If the state is closed and the task queue is empty, it indicates that the task queue is empty and no task will be submitted, so the number of threads decreases1And the obtained task is emptyif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        intwc = workerCountOf(c); / Whether to enable the timeout mechanism. Enabled when the core thread is allowed to timeout or the number of current threads exceeds the core threadbooleantimed = allowCoreThreadTimeOut || wc > corePoolSize; If the number of threads exceeds the maximum number allowed by the thread pool or a task timeout is obtained with timeout enabled, the thread should theoretically be reclaimed. However, if this thread is the last thread in the thread pool and the task queue is not empty, it can not be reclaimed and continue to run. If there are other threads or the task queue is empty, the thread is reclaimed.if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1| | workQueue. IsEmpty ())) {/ try to reduce the number of threads1, returns successfullynullFailure continues at the beginning of the cycle. Why not decrementWorkerCount(), which doesn't fail1And this way. Because WC >1If there is more than one thread in the thread pool and they find more than one thread in each other, and they both execute the method of subtracting the number of threads without failing, then the number of threads in the thread pool may be zero0There are no threads to execute the tasks in the queue.if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try{/1. If timeout is enabled, the poll() method is executed and returns after keepAliveTime has not been retrieved for nanosecondsnull.2. If the take() method is executed without the timeout mechanism enabled, the queue blocks until there are no tasks. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();if(r ! =null)
                returnr; TimedOut = =true;
        } catch(InterruptedException retry) {/ Being interrupted during take() does not timeout timedOut =false; }}}Copy the code

ProcessWorkerExit method

private void processWorkerExit(Worker w, boolean completedAbruptly) {/ Because it is not normal to exit because the task cannot be obtained, the number of threads must be reduced here1The getTask() method that normally exits has this subtraction1operationif (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock; / lock because HashSet and completedTaskCount are not thread-safe mainlock. lock();try{/ Add the number of tasks performed by a thread to the completedTaskCount field maintained by the thread pool. CompletedTaskCount += w.completedTasks; workers.remove(w); }finally{ mainLock.unlock(); } / Try to set the thread pool to tryTerminate();intc = ctl.get(); / Continue until the current thread pool state is less than STOP (running or closed)if(runStateLessThan(c, STOP)) {if the thread exits the runWorker method abnormally, a non-core thread with no initial task is addedif(! completedAbruptly) {/ These three lines of code find the minimum number of threads in the current thread poolint min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0&&! workQueue.isEmpty()) min =1; / If the current number of threads is greater than or equal to min, return directly, otherwise add a non-core thread with no initial taskif (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null.false); }}Copy the code

The following is the normal execution of a thread pool after submitting a task to it:

TryTerminate method

terminate

final void tryTerminate(a) {
    for (;;) {
        intc = ctl.get(); / There is no way to reach the end state if any of the following conditions are met1The thread pool is running2The thread pool state is TIDYING or has ended3The thread pool is closed and the task queue is not emptyif (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return; / The current number of threads is not0You can't get to the end stateif(workerCountOf(c) ! =0InterruptIdleWorkers (ONLY_ONE);return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try{/ Try to set the thread pool state to TIDYING, starting at the beginning of the failed loopif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try{/terminated() is an empty method that is left to be overridden by classes terminated() that inherit ThreadPoolExecutor; }finally{/ Try to set thread pool state to TERMINATED ctl.set(ctlOf(TERMINATED,0));
                    termination.signalAll();
                }
                return; }}finally{ mainLock.unlock(); }}}Copy the code

closing

We can close the thread pool by calling the void shutdown() method, and no new tasks can be accepted after the pool is closed

public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try{/ security policy checkShutdownAccess(); / Set the thread pool state to SHUTDOWN advanceRunState(SHUTDOWN); / interruptIdleWorkers(); onShutdown();// hook for ScheduledThreadPoolExecutor
    } finally{ mainLock.unlock(); } / Try to terminate the thread pool tryTerminate(); }Copy the code

Stop the operation

The thread pool can be stopped in running and closed state by calling void shutdownNow(). The pool is not allowed to accept new tasks, does not block tasks in the queue, and interrupts all current threads.

public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try{/ security policy checkShutdownAccess(); / Set the thread pool state to advanceRunState(STOP); / interruptWorkers() interruptWorkers(); Tasks = drainQueue(); }finally{ mainLock.unlock(); } / Try to terminate the thread pool tryTerminate();return tasks;
}
Copy the code

3. Configure the thread pool

Executors provides four static factory methods to create thread pools with four different configurations:

  • newFixedThreadPool(int nThreads)

    Take an nThreads variable of type int, create a thread pool with both a core and a maximum number of threads (i.e., nThreads), and use an unbounded blocking queue called LinkedBlockingQueue. Threads created will not time out if the core thread timeout is not set.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
Copy the code
  • newSingleThreadExecutor()

    Create a thread pool with 1 core thread and 1 maximum thread count, and use an unbounded queue LinkedBlockingQueue that will not time out without setting the core thread timeout. A unique thread ensures that tasks are executed in sequence, and if the execution of this unique thread ends due to an exception, the processWorkerExit method will determine at the end whether a new thread is created to continue running because of the exception.

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
Copy the code
  • newCachedThreadPool()

    Create a thread pool with a core thread count of 0 and a maximum thread count of integer. MAX_VALUE, and a timeout of 60 seconds, so threads idle for more than 60 seconds are reclaimed. A synchronous queue is used as a blocking queue. The synchronous queue does not store elements and inserts at one end. The insert will only succeed if there is a remove operation at the other end, otherwise the insert will block the wait.

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code
  • newScheduledThreadPool()

    Create a thread pool with corePoolSize to execute tasks periodically at specified times. ScheduledThreadPoolExecutor inherited from ThreadPoolExecutor.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
Copy the code