Why use thread pools

  1. The pool technology is used to reuse created threads to avoid the loss caused by frequent creation and destruction of threads, reduce resource consumption, and improve response speed.
  2. When a large number of tasks are started on the server, the creation of a large number of threads consumes the memory space of the server and affects the server usage. Thread pools can be used to manage threads.
  3. Thread pools are extensible, allowing developers to add more functionality to them

Overall thread pool design and source code parsing

Let’s take a macro look at how thread pools work

From the running mechanism of thread pool, we can see that a producer-consumer model is built inside thread pool, which decouple threads and tasks and is not directly associated with each other. In the case of a large number of tasks, tasks are temporarily stored in task queue, so as to buffer tasks well

Thread pool running status

The thread pool itself is stateful. Here are the five states of the thread pool

Running state State description
RUNNING The thread pool can receive new task submissions and can process tasks in the blocking queue normally.
SHUTDOWN No more tasks are submitted, and the thread pool can continue processing tasks in the blocking queue.
STOP New tasks are no longer accepted, and existing tasks in the blocking queue are discarded. In addition, it interrupts tasks in progress.
TIDYING After all tasks have executed (including tasks in the blocking queue), the number of active threads in the current thread pool is reduced to zero and the terminated method will be called.
TERMINATED Terminated state of the thread pool. The thread pool will enter the state when terminated

Thread pool state flow

State transition Mode of state transition
RUNNING -> SHUTDOWN When the shutdown method of a thread pool is called, or when the Finalize method is implicitly called (which calls shutdown internally)
RUNNING, SHUTDOWN -> STOP When the shutdownNow method of the thread pool is called
SHUTDOWN -> TIDYING When both the thread pool and the blocking queue are empty
STOP -> TIDYING When the thread pool becomes empty
TIDYING->TERMINATED When the terminated method is executed

So how does a thread pool manage its own running state and the number of threads in the pool?

// CTL: three bits indicate the running status of the thread pool, and the lower 29 bits indicate the number of threads running in the thread pool
// The advantage of holding two values in one variable is that you don't have to bother (such as locking) to maintain consistency between the two states
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// Obtain the current running status of the thread pool (~ : bit-inverse, that is, 0 becomes 1 and 1 becomes 0).
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// Get the number of threads currently running in the thread pool
private static int workerCountOf(int c)  { return c & CAPACITY; }
// Obtain the CTL from the thread pool status and the number of threads running
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

Next comes source code parsing

Let’s take a macro look at the thread pool class inheritance

The core implementation class for thread pools in Java is ThreadPoolExecutor

Executor: Provides only an interface to execute tasks. Users do not need to worry about how to create threads or schedule threads, but only provide a Runnable object

ExecutorService: Adds interfaces to execute tasks, such as task submission, and thread pool life cycle management

AbstractExecutorService: an abstract class, will perform the task of process series up, ensure the realization of the lower need to focus on the method of a mission

ThreadPoolExecutor: Maintains its own lifecycle while simultaneously managing threads and tasks, enabling a good combination of the two to execute parallel tasks

The thread pool performs the task

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState  and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
        int c = ctl.get();
        // If the number of threads in the thread pool is smaller than coolPoolSize
        if (workerCountOf(c) < corePoolSize) {
            // Add a thread and treat the submitted thread as a firstTask
            if (addWorker(command, true))
                return;
            Since the state of the thread pool and the number of threads running can change at any time, it is important to check the thread pool timing
            c = ctl.get();
        }
        // This judgment is entered because the above judgment is not valid, either the corePoolSize has reached the upper limit, or adding threads failed
        // The thread pool must be queued before the thread pool is queued
        if (isRunning(c) && workQueue.offer(command)) {
          // Obtain the value of CTL again, and make judgment every moment
            int recheck = ctl.get();
            // If the thread pool is not in the running state, then the subsequent remove operation will be performed, which is equivalent to a rollback to the thread executing this time
            / / remove
            if (! isRunning(recheck) && remove(command))
              	// Execute the reject policy
                reject(command);
                // The thread pool is in execution state
            else if (workerCountOf(recheck) == 0)
              	// If the worker thread is 0, create a non-core thread to prevent a situation where there is a task but no thread executing it
                addWorker(null.false);
        }
        // Failed to create a new thread, reject it directly
        else if(! addWorker(command,false))
            reject(command);
    }
Copy the code

Thread pool add task

// If core is true, core Poolsize will be used as the critical condition; if false, maximumPoolSize will be used as the critical condition
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // Get the thread pool running status
            int rs = runStateOf(c);
        // Make a decision about whether or not to create a thread
        (rs >= SHUTDOWN && (rs! = SHUTDOWN || firstTask ! = null ||
        // workQueue.isEmpty()))
        Rs > SHUTDOWN (thread pool in STOP, TIDYING, or TERMINATED state, failed to add worker threads)
        // 2.rs >= SHUTDOWN && firstTask ! = null
        // 3.rs >= SHUTDOWN && workQueue.isEmpty()
        // In all three cases, no new threads are created
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        for (;;) {
            // The number of threads running in the thread pool
            int wc = workerCountOf(c);
            // Determine whether the thread has reached the upper limit
            // If a thread in corePoolSize is added, determine whether the upper limit of corePoolSize is exceeded
            // If you add threads from maximumPoolSize, check whether the maximum maximumPoolSize is exceeded
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // Add the number of threads, if successful, break out of the loop
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // Get c again
            c = ctl.get();  // Re-read ctl
            // Whether the thread pool state is equal to the original state
            if(runStateOf(c) ! = rs)// If not, the thread pool has changed and the previous operation needs to be performed again
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
    	// Create a worker object
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            // ReentrantLock Exclusive lock
            mainLock.lock();
            try {
                // Get the thread pool status again
                int rs = runStateOf(ctl.get());
                // Make a decision to run the thread first
                // 1. The thread pool is running
                // 2. The thread pool is in SHUTDOWN state but task== NULL, because new tasks are not accepted in SHUTDOWN state
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // Add worker to a hashset
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                      	// Record the maximum number of threads.
                        largestPoolSize = s;
                    // Make a flag indicating that the worker thread is added to the hashset
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            if (workerAdded) {
            	// Thread of execution
                t.start();
                // Indicates that the worker thread has started
                workerStarted = true; }}}finally {
    	// The thread failed to start. Do some rollback operations
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

The Worker class

The Worker is an inner class of a ThreadPoolExecutor, which is used to maintain the interrupt control status of a thread executing a task. It implements the Runnable interface and inherits AQS. Implementing Runnable interface means that the Worker is a thread. AQS is inherited for exclusive locking.

    private final class Worker
      extends AbstractQueuedSynchronizer
      implements Runnable{
        /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in. Null if factory fails. */
      	// The thread that executes the task
        final Thread thread;
        /** Initial task to run. Possibly null. */
      	// The task to perform
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
          // When a new thread is created, the value of state-1 is set to prevent it from being interrupted by another thread explicitly using the shutdown method before the thread is executed, because the interrupt must be determined that the state is greater than or equal to zero
          setState(-1); 
          this.firstTask = firstTask;
          // Create a new thread
          this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker */
        public void run(a) {
            runWorker(this);
        }
Copy the code

The Worker class has a default implementation for the way threads are created

static class DefaultThreadFactory implements ThreadFactory {
      	// In the system, there may be more than one thread pool, so this is a static field
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
      	Each thread belongs to a specific thread pool, so this field is not static
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();// Thread name prefix
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
              	// For threads in the thread pool, all are user threads
                t.setDaemon(false);
            if(t.getPriority() ! = Thread.NORM_PRIORITY)// Threads have the same priority
                t.setPriority(Thread.NORM_PRIORITY);
            returnt; }}Copy the code
final void runWorker(Worker w) {
        // Get the current thread
        Thread wt = Thread.currentThread();
        // Get the task
        Runnable task = w.firstTask;
        // Leave the task explicitly empty to prevent a confusing problem and get a duplicate next time
        w.firstTask = null;
        // Set the thread state to 0 (when the Worker was created, the state was -1) to allow the thread to interrupt when running
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // Loop to determine whether the task (firstTask or task obtained from the queue) is empty
            while(task ! =null|| (task = getTask()) ! =null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
              	// Check whether the thread pool is in the stop state or whether the thread is interrupted
                if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                    // Callbacks can be extended appropriately
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      	// Execute the task
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally{ afterExecute(task, thrown); }}finally {
                    task = null;
                    // The number of threads that have completed execution is used as a statistic
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally{ processWorkerExit(w, completedAbruptly); }}// Get the task from the blocking queue
  private Runnable getTask(a) {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // Get the thread pool status
            int rs = runStateOf(c);

            The thread pool status is STOP, TIDYING, and TERMINATED
       	    // 2. Thread pool shutdown and queue is empty.
            // If one of the above conditions is met, the number of worker threads is subtracted from wc by 1, and null is returned
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            AllowCoreThreadTimeOut Whether to allow core worker threads to time out for destruction. The default is false and can be set to true
            // The number of worker threads is greater than the number of core threads
            // If one of the conditions is met, timed is true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
						
            / / 1. (a worker thread number > maximumPoolSize) | | (timed = = true && timedOut = = true)
            // 2. The number of worker threads is greater than 1 or the queue is empty
            Generally, this condition is triggered when the number of worker threads is greater than maximumPoolSize and the task queue is empty
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
              	// If timed is true and no valid task is waiting for keepAliveTime, null is returned if timed is true and no valid task is waiting for keepAliveTime
            	// If timed is false, take() will block until there is another valid task and return null.
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! =null)
                    return r;
              	// timedOut = true when the task cannot be retrieved
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false; }}}Copy the code

Task queue

GetTask () has a step to get a task from a task queue. Different task queues have different access policies. The following are some optional task queues:

The name of the describe
ArrayBlockingQueue A bounded blocking queue implemented with an array, in which elements are sorted on a first-in, first-out (FIFO) basis and concurrency is controlled by a reentrant lock
LinkedBlockingQueue A bounded queue with a linked list structure that sorts elements on a first-in, first-out (FIFO) basis
PriorityBlockingQueue An unbounded queue that supports priority ordering of threads
DelayQueue An unbounded queue implementing PriorityBlockingQueue implements delayed fetching. When creating an element, you can specify how long to retrieve the current element from the queue
SynchronousQueue A blocking queue that does not store elements. Every PUT operation must wait for a take operation. Supports fair and unfair locking. Using SynchronousQueue will Executors. NewCachedThreadPool ()
LinkedTransferQueue An unbounded blocking queue composed of a linked list structure, equivalent to other queues, LinkedTransferQueue queue has more transfer and tryTransfer methods
LinkedBlockingDeque A two-way blocking queue consisting of a linked list structure

Thread pool Reclaiming

GetTask () is used to get control of the number of tasks and threads. If the thread pool should not have so many threads, it will return null. There are two places in the code where null is returned

The first place:

  • The thread pool status is STOP, TIDYING, and TERMINATED
  • The thread pool is shutdown and the queue is empty.

In the second place

  • Worker thread number > maximumPoolSize and the task queue is empty
  • Number of worker threads >corePoolSize and the task queue is empty
// completedAbruptly:true Indicates that the user exits abnormally
private void processWorkerExit(Worker w, boolean completedAbruptly) {
  	// If the worker thread exits unexpectedly, the number of worker threads is reduced by 1
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Count the number of completed tasks
            completedTaskCount += w.completedTasks;
            // Remove the worker thread count
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
	// Try to interrupt the idle thread
        tryTerminate();

        int c = ctl.get();
  	// If the thread pool is in the RUNNING or SHUTDOWN state
        if (runStateLessThan(c, STOP)) {
          	// The worker thread is not abnormal
            if(! completedAbruptly) {// allowCoreThreadTimeOut specifies whether core worker threads are allowed to time out for destruction
              	// If allowed, the minimum number of core worker threads is 0
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
              	// If the minimum number of core threads reserved is 0 and the task queue is not empty, it means that at least one more thread is needed to complete the task
                if (min == 0&&! workQueue.isEmpty())// Change the minimum number of threads to 1
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // If the number of workers currently running is less than the number of workers currently needed, the addWorker is called to add a new Worker
            addWorker(null.false); }}Copy the code

Thread pool rejection policy

Rejection policies Refused to way
AbortPolicy Throws a run-time exception directly
DiscardPolicy Silently discard the submitted task, doing nothing and throwing no exceptions
DiscardOldestPolicy Discard the longest-running task in the blocking queue (the queue head element) and leave a free space in the queue for the currently submitted task to be put into the queue
CallerRunsPolicy The submitted task is run directly by the thread that submitted it
    // The thread that submitted the task will run the submitted task directly
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy(a) {}/**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if(! e.isShutdown()) { r.run(); }}}// Throw an exception directly
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy(a) {}/**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from "+ e.toString()); }}/** * A handler for rejected tasks that silently discards the * rejected task. */
    // Silently discard the rejected task
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy(a) {}/**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
      	// This method does nothing
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}/**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy(a) {}/**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
      	// Remove the header element, removing the oldest mission
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // Check whether the thread pool is closed
            if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

Three ways to submit tasks in the Abstract Class AbstractExecutorService

There are three ways to submit, and either way, the end result is to convert the incoming task into a Callable object for processing, and then invoke it

The execute method declared in the Executor interface is handled uniformly

	// All three methods of submitting tasks are similar
	public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
        }

		
	publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
        }
		

	public FutureTask(Runnable runnable, V result) {
      	// The Runnable object is converted to a Callable object
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
        }


	static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call(a) {
            task.run();
          	// Note that the result returned is null, passed in by layers
            returnresult; }}Copy the code

test

    public static void main(String[] args) {

        ExecutorService executors = new ThreadPoolExecutor(2.5.10, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(4));

        IntStream.range(0.10).forEach(i -> {
            executors.submit(() -> {
                IntStream.range(0.50).forEach(j -> System.out.println(Thread.currentThread().getName()));
            });
        });


        executors.shutdown();
    }
Copy the code

The resources

Blog.csdn.net/bieber007/a…

Tech.meituan.com/2020/04/02/…