In this article you will find the following information:

  • Thread pool source interpretation
  • The thread pool performs process analysis
  • Thread pool implementation with return value
  • Delay thread pool implementation

For the sake of the reader’s understanding, this article will start with thread pool usage and then extend to the advanced content such as source code interpretation and source analysis. The reader can choose the order of reading and the chapters he or she needs to know.

1. Advantages of thread pools

A thread pool makes full use of system resources such as CPU, memory, network, and I/O. The main functions of a thread pool are as follows:

  • Thread pool can be used to reuse threads and control the maximum number of concurrent;
  • Implement task cache policy and reject mechanism;
  • Implement delayed execution

The Alibaba Java Development Manual mandates that thread resources must be provided through thread pools, as shown below:

Thread pool usage

This section describes the creation and use of the seven thread pools, the status of the thread pools, and ThreadPoolExecutor parameters.

2.1 Creating a thread Pool

There are six ways to create a thread pool using Executors, as shown below:

// Create by using Executors
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
ExecutorService workStealingPool = Executors.newWorkStealingPool();
// The original creation mode
ThreadPoolExecutor tp = new ThreadPoolExecutor(10.10.10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
Copy the code

2.1.1 Thread Pool Interpretation

  1. NewSingleThreadExecutor (), which is limited to one worker thread and operates on an unbounded work queue, ensures that all tasks are executed sequentially and that at most one task is active, and does not allow users to change the thread pool instance. So you can avoid it changing the number of threads.
  2. NewCachedThreadPool (), which is a pool of threads used to handle a large number of short-lived work tasks, has several distinct features: it attempts to reuse cached threads, and creates new worker threads when no cached threads are available; If the thread is idle for more than 60 seconds, it is terminated and removed from the cache; When idle for a long time, this thread pool consumes no resources. Internally, it uses SynchronousQueue as the work queue.
  3. NewFixedThreadPool (int nThreads), which reuses a specified number of threads (nThreads) with an unbounded work queue behind it. At most nThreads are active at any one time. This means that if the number of tasks exceeds the number of active queues, the work queue will wait for idle threads to appear; If a worker thread exits, a new worker thread will be created to make up the specified number of nThreads.
  4. NewSingleThreadScheduledExecutor () to create a single thread pool, return to ScheduledExecutorService, can carry out regular or periodic work scheduling.
  5. NewScheduledThreadPool (int corePoolSize) and newSingleThreadScheduledExecutor (), create a ScheduledExecutorService, Work can be scheduled regularly or periodically, depending on whether it is a single worker thread or multiple worker threads.
  6. NewWorkStealingPool (Int Parallelism) is a common thread pool that is often ignored. Java 8 added the ForkJoinPool creation method, which creates a ForkJoinPool that uses work-stealing algorithms to process tasks in parallel. Processing order is not guaranteed.
  7. ThreadPoolExecutor is the original thread pool creation method, and the above methods 1-3 encapsulates ThreadPoolExecutor.

Conclusion: NewSingleThreadExecutor, newCachedThreadPool, and newFixedThreadPool are encapsulated implementations of ThreadPoolExecutor. NewSingleThreadScheduledExecutor, newScheduledThreadPool ThreadPoolExecutor subclasses ScheduledThreadPoolExecutor encapsulation, delay is used to perform tasks, NewWorkStealingPool is a new addition to Java 8.

2.1.2 Significance of a single thread Pool

Can be seen from the above code newSingleThreadExecutor and newSingleThreadScheduledExecutor created are single thread pool, so what is the meaning of the single thread pool?

Although it is a single thread pool, it provides work queue, life cycle management, worker thread maintenance, and so on.

2.2 ThreadPoolExecutor interpretation

ThreadPoolExecutor is the core method of thread pools. Let’s take a look at the internal implementation of ThreadPoolExecutor and how it is called by a wrapper class.

Let’s start with the constructor, which has the following source code:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
Copy the code

Parameter description:

  • CorePoolSize: The so-called number of core threads, which can be roughly understood as the number of threads that are permanently resident (unless allowCoreThreadTimeOut is set). For different thread pools, this value can be very different; for example, newFixedThreadPool will set it to nThreads, while newCachedThreadPool will set it to 0.
  • MaximumPoolSize: As the name implies, the maximum number of threads that can be created if there are not enough threads. For the same comparison, newFixedThreadPool is nThreads, of course, because it requires a fixed size, while newCachedThreadPool is integer.max_value.
  • KeepAliveTime: the keepAliveTime of an idle thread. If the idle time exceeds this value, the thread will be shut down. Notice This value takes effect only when the idle time exceeds this value and the number of threads in the thread pool is less than or equal to the core thread size. Of course, core threads are not closed by default unless allowCoreThreadTimeOut(true) is set in which case core threads can be reclaimed.
  • TimeUnit: TimeUnit.
  • BlockingQueue: Task drop column, used to store pending tasks for the thread pool.
  • ThreadFactory: Used to generate threads, we can use the default.
  • Handler: This specifies what policy to take when a thread pool is full and new tasks are submitted. There are several ways to do this, such as throw an exception, reject and return, or you can implement your own interface to implement your own logic.

Consider a call to ThreadPoolExecutor from a thread pool wrapper class:

NewSingleThreadExecutor encapsulates ThreadPoolExecutor as follows:

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}
Copy the code

NewCachedThreadPool encapsulates ThreadPoolExecutor as follows:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
Copy the code

NewFixedThreadPool encapsulates ThreadPoolExecutor as follows:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
Copy the code

ScheduledExecutorService encapsulates ThreadPoolExecutor as follows:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
Copy the code

NewSingleThreadScheduledExecutor use is a subclass of ThreadPoolExecutor ScheduledThreadPoolExecutor, as shown in the figure below:

NewScheduledThreadPool encapsulates ThreadPoolExecutor as follows:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
Copy the code

Is also a subclass of ThreadPoolExecutor ScheduledThreadPoolExecutor newScheduledThreadPool use.

2.3 Thread Pool Status

The ThreadPoolExecutor source code shows the thread status as follows:

Thread state interpretation (from javadoop.com/post/java-t…

  • RUNNING: There is nothing to say about this, this is the most normal state: accept new tasks, process tasks in the waiting queue;
  • SHUTDOWN: No new tasks are submitted, but the tasks in the waiting queue continue to be processed.
  • STOP: No new tasks are submitted, the tasks in the waiting queue are no longer processed, and the threads that are executing tasks are interrupted.
  • TIDYING: All missions are destroyed, workCount is 0. The thread pool state executes the hook method terminated() when transitioning to the TIDYING state.
  • The state of the thread pool will change to this after the TERMINATED: TERMINATED () method;

RUNNING is defined as -1, SHUTDOWN is defined as 0, and other tasks are greater than 0. Therefore, tasks cannot be submitted when the value is equal to 0. If the value is greater than 0, even the tasks in execution need to be interrupted.

Read the introduction of these states, readers can generally guess the state of nine out of ten, each state of the transition process has the following:

  • RUNNING -> SHUTDOWN: This is the most important transition that occurs when SHUTDOWN () is called;
  • ShutdownNow () -> STOP: SHUTDOWN () -> SHUTDOWN ();
  • SHUTDOWN -> TIDYING: switch from SHUTDOWN to TIDYING when the task queue and thread pool are empty;
  • STOP -> TIDYING: This transition occurs when the task queue is empty;
  • TIDYING -> TERMINATED: TERMINATED when the TERMINATED () method ends;

2.4 Thread pool execution

There are two ways to commit a thread pool task:

  • execute
  • submit

Execute can only accept Runnable tasks. Use the following command:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(new Runnable() {
    @Override
    public void run(a) { System.out.println(Thread.currentThread().getName()); }});Copy the code

Submit can accept either Runnable or Callable tasks, using the following:

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
    @Override
    public void run() { System.out.println(Thread.currentThread().getName()); }});Copy the code

2.4.1 Implementation of thread pools with return values

The Callable class is passed by Submit to get the return value of the task being executed. Callable is a feature added in JDK 1.5 to complement the case where Runnable does not return.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Long> result = executorService.submit(new Callable<Long>() {
    @Override
    public Long call(a) throws Exception {
        return newDate().getTime(); }});try {
    System.out.println("Run result:" + result.get());
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
Copy the code

2.4.2 Delaying the implementation of thread pools

NewSingleThreadScheduledExecutor in thread pool and newScheduledThreadPool returns ScheduledExecutorService, used to perform delay the thread pool, the code is as follows:

/ / delay thread pool ScheduledExecutorService scheduledThreadPool = Executors. NewScheduledThreadPool (2); scheduledThreadPool.schedule(newRunnable() {
    @Override
    public void run() {
        System.out.println("time:" + new Date().getTime());
    }
}, 10, TimeUnit.SECONDS);
Copy the code

The full example can be downloaded from github.com/vipstone/ja…

Three, thread pool source code interpretation

A trick to reading the source code of a thread pool is to concatenate the threads in the order they are executed to make it easier to understand the implementation of a thread pool.

Source code reading process interpretation

From execute() we can see that the core method of thread pool execution is addWorker(). In addWorker() we can see that the start thread calls the start() method. The getTask() method is the key to running the program. After the getTask() method, the thread is closed. The workflow of the entire thread pool is completed. Read on (or go back and look at the source code after you’ve read it).

3.1 Execute () Source code interpretation

public void execute(Runnable command) {
    if (command== null) throw new NullPointerException(); int c = ctl.get(); // If the current number of threads is less than the number of core threads, then directly add a worker to execute the task, // create a new thread, and the current taskcommandAs the firstTask of this threadif(workerCountOf(c) < corePoolSize) {// Add task successfully, then end. Submit the task, the thread pool has accepted the task, and the method can return the result of execution, which will be wrapped in FutureTask. / / returnfalseIndicates that the thread pool is not allowed to submit tasksif (addWorker(command.true))
            return; c = ctl.get(); If the thread pool is in the RUNNING state, add the task to the workQueue. If the thread pool is in the RUNNING state, add the task to the workQueueif (isRunning(c) && workQueue.offer(command)) {/* If a task enters the workQueue, do we need to start a new thread * because [0, corePoolSize] is unconditional * if the number is greater than or equal to corePoolSize, */ int recheck = ctl.get(); // If the thread pool is no longer in the RUNNING state, remove the queued task and execute the reject policyif (! isRunning(recheck) && remove(command))
            reject(command); // If the thread pool is still RUNNING and the number of threads is 0, then start a new thread. // If the thread pool is still RUNNING and the number of threads is 0, then start a new threadelse if (workerCountOf(recheck) == 0)
            addWorker(null, false); } // If the workQueue is full, enter the branch and create a new worker with maximumPoolSize. // If the branch fails, the number of threads has reached maximumPoolSize and the policy is rejectedelse if(! addWorker(command.false))
        reject(command);
}
Copy the code

3.2 Interpretation of addWorker() source code

// The first argument is the task to be submitted to the thread for execution, which can be nulltrue// If the number of threads in the pool has reached the corePoolSize, then the request to create a thread cannot be answeredfalsePrivate Boolean addWorker(Runnable firstTask, Boolean core) {retry:for(;;) { int c = ctl.get(); int rs = runStateOf(c); // Do not create a new worker if the thread pool is closed and one of the following conditions is met: // 1. The thread pool state is greater than SHUTDOWN. The state is STOP, TIDYING, or TERMINATED. = null // 3.workqueue.isempty () // // If the thread pool is in SHUTDOWN state, the task is not allowed to submit, but the existing task continues to execute. // If the thread pool is in SHUTDOWN state, the task is not allowed to submit, and the ongoing task is interrupted. If the thread pool is SHUTDOWN, but the firstTask is null and the workQueue is not empty, then the worker is allowed to be createdif (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false; // If this is successful, all the conditions before the creation of the thread are met and the thread is ready to execute the task. // If this fails, another thread is trying to create a thread into the poolif (compareAndIncrementWorkerCount(c))
                breakretry; C = ctl.get(); // If the CAS fails, go to the next layerforIf the thread pool is closed by another thread, then the thread pool needs to go back to the outer layerforcycleif(runStateOf(c) ! = rs)continue retry;
            // elseCAS failed due to workerCount change; }} /* * At this point, we think it's time to create a thread to execute the task * * because everything is checked * * what happens after that, that's for later. */ // Whether the worker has started. Boolean workerStarted =false; // Whether the worker has been added to the workers HashSet. Boolean workerAdded =false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; W = new worker (firstTask); // Fetch the Thread object from the worker. As mentioned earlier, the worker constructor calls ThreadFactory to create a new Thread. Final Thread t = w.thread;if(t ! = null) {// This is a global lock for the entire class. Holding this lock makes the following operations "logical." // Because this lock is required to close a thread pool, at least as long as I hold the lock, the thread pool will not be closed. try { int c = ctl.get(); int rs = runStateOf(c); // If it is less than SHUTDOWN, it is RUNNING. // If it is equal to SHUTDOWN, it will not accept new tasks, but continue to execute tasks in the waiting queueif(rs < SHUTDOWN | | (rs = = SHUTDOWN && firstTask = = null)) {/ / the inside of the worker thread is not already startedif(t.isAlive()) throw new IllegalThreadStateException(); Workers.add (w); // Add to the HashSet workers. int s = workers.size(); // largestPoolSize is used to record the maximum number of workers // Since workers are increasing and decreasing, this value can be used to know the maximum size of the thread poolif (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; } } finally { mainLock.unlock(); } // Start the thread if the addition is successfulif(workerAdded) {// start the thread. workerStarted =true; }}} finally {// If the thread is not started, we need to do some cleaning, such as workCount + 1, subtracting itif(! workerStarted) addWorkerFailed(w); } // Return whether the thread started successfullyreturn workerStarted;
}
Copy the code

In this code, you can see that t.start() is called;

3.3 Interpretation of runWorker() source code

The Worker’s run() method is called immediately after the Worker’s t.tart () method is called.

public void run() {
    runWorker(this);
}
Copy the code

The runWorker() source code is as follows:

// When the worker thread starts,whileLoop (spin!) // When the worker initializes, the firstTask can be specified. Final void runWorker(Worker w) {Thread wt = thread.currentThread (); // The thread's first task (if any) Runnable task = w.firstTask; w.firstTask = null; // Allow interrupts w.unilock (); boolean completedAbruptly =true; Try {// Call getTask in a loop to get the taskwhile(task ! = null || (task = getTask()) ! = null) { w.lock(); // If the thread pool state is greater than or equal to STOP, that means the thread is also interrupted /** * If the thread pool is stopped, make sure the thread is interrupted * if it is not, make sure the thread is not interrupted * this needs to be rechecked in the second case to handle shutdownNow contention when interrupts are turned off */if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt(); Try {// This is a hook method and is left to the required subclasses to implement beforeExecute(wt, task); Throwable thrown = null; Try {// task is finally ready to execute task. Run (); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; // Thrown Error = x; // Thrown Error = x; // Thrown Error = x; throw new Error(x); } finally {// This is also a hook method that takes task and exception arguments and leaves it to subclasses that want to implement afterExecute(task, Thrown); }} finally {// getTask = null; W.com pletedTasks++; // Release the worker's exclusive lock w.unilock (); } } completedAbruptly =false; } finally {// At this point, we need to close the thread // 1. GetTask returns null, that is, the worker's mission is over, execute close // 2. Task execution process of abnormal happened / / the first case, has been in the code processing will workCount minus 1, the said in getTask method / / the second case, workCount without processing, So processWorkerExit needs to be processed in processWorkerExit(w, completedAbruptly); }}Copy the code

3.4 getTask() source code interpretation

GetTask () is inside the runWorker. Here’s how to implement it:

// This method has three possibilities // 1. By default, threads within the corePoolSize are not reclaimed, they wait for the task // 2. Exit due to timeout. When keepAliveTime is in effect, that is, if there is no task for that amount of time, it should be closed // 3. Return NULL if the following conditions occur // There are more than maximumPoolSize workers in the pool (by callingset(MaximumPoolSize is set) // the thread pool is SHUTDOWN and the workQueue is empty. Even threads in the workQueue no longer execute private RunnablegetTask() {
    boolean timedOut = false; // Did the last poll() time out?
   for(;;) {// Allow threads to recycle within the core thread count, or the current number of threads exceeds the core thread count, then timeout shutdown may occur // herebreak, in order not to proceed to the latterif(compareAndDecrementWorkerCount (c)) / / twoifIf wc > maximumPoolSize, or if the current number of threads is greater than maximumPoolSize, null is returned. // In other words, returning NULL means to close the thread. // That's because it might be called by the developersetMaximumPoolSize reduces the MaximumPoolSize of the thread pool. If this worker is interrupted, the solution is to retrysetMaximumPoolSize method, // If the developer reduces MaximumPoolSize so that it is smaller than the current number of workers, this means that the excess threads should be closed. Back into theforInt c = ctl.get(); int rs = runStateOf(c); // Checkif queue empty only if necessary.
            if(rs > = SHUTDOWN && (rs > = STOP | | workQueue. IsEmpty ())) {/ / CAS operation, reduce the number of worker threads decrementWorkerCount ();return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if(r ! = null)return r;
                timedOut = true; } catch (InterruptedException retry) {// If this worker InterruptedException occurs, retry is requiredsetMaximumPoolSize method, // If the developer reduces MaximumPoolSize so that it is smaller than the current number of workers, this means that the excess threads should be closed. Back into theforIt is natural that some threads will return null timedOut =false; }}}Copy the code

Thread pool execution process

The execution process of thread pool is as follows:

Five, the summary

The summary of this article is presented in the form of questions and answers, quoted from “in-depth interpretation of Java thread pool design ideas and source code implementation”, with a reference address at the bottom.

What are the key attributes of the thread pool?

  • Threads between corePoolSize and maximumPoolSize are reclaimed, but corePoolSize threads can also be reclaimed by setting (allowCoreThreadTimeOut(true)).

  • If the number of threads in the workQueue exceeds the corePoolSize, the thread pool will pull the task from the workQueue.

  • KeepAliveTime is used to set the idle time. If the number of threads exceeds the corePoolSize, and some threads exceed the idle time value, they will be shut down

  • RejectedExecutionHandler used to handle the case when a thread pool can’t perform this task, the default has thrown RejectedExecutionException anomaly, ignore the task and submit task threads to use to perform this task and will delete the queue waiting for the longest tasks, The task is then committed to each of the four policies, which by default throw an exception.

When are threads created in the thread pool?

  • If the current number of threads is less than corePoolSize, a new thread is created when the task is submitted and the task is executed by this thread.

  • If the current number of threads has reached corePoolSize, the submitted task is added to the queue and waits for threads in the thread pool to fetch the task from the queue.

  • If the queue is full, a new thread is created to perform the task, ensuring that the number of threads in the pool does not exceed maximumPoolSize. If the number of threads exceeds maximumPoolSize, then the rejection policy is executed.

3. What can I do if an exception occurs during task execution?

If a task fails to execute, the thread executing the task is closed instead of continuing to receive other tasks. A new thread is then started in its place.

4. When will the rejection policy be implemented?

  • When the number of workers reaches the corePoolSize, the task is enqueued successfully, and the thread pool is closed at the same time, and the closing of the thread pool does not remove the task from the queue, then the rejection policy is executed. This is a very borderline issue, queueing and closing thread pools concurrently, and the reader takes a closer look at how the execute method gets into the first reject(Command).
  • The number of workers is greater than or equal to the corePoolSize, and the workers are ready to join the queue, but the queue is full, the task fails to join the queue, and the new thread is ready to start, but the number of threads has reached maximumPoolSize, and the rejection policy is implemented.

6. Reference materials

Book: Code Efficiently: A Java Development Manual

Java core technology 36 talk: t.cn/EwUJvWA

Deep interpretation of the Java thread pool design ideas and source code implementation: javadoop.com/post/java-t…

Java thread pool – ThreadPoolExecutor source code parsing (based on Java8) : www.imooc.com/article/429…

Course Recommendation: