Java version: 8U261.

For thread pools in Java, most interviews ask about the meaning of the parameters in the thread pool, or the process that the thread pool executes, as if this has become a fixed pattern and routine. But if I were the interviewer and I wanted to ask some more detailed questions, would you be able to answer them? Such as:

  1. How does a thread pool implement thread reuse?
  2. If a thread throws an exception while executing a task, will the task be discarded?
  3. There are currently ten threads in the thread pool, and one thread is executing a task. What state are the remaining nine threads in?

These questions are difficult to answer without looking at the source code implementation of thread pools. Can you handle blocking queues and AQS in Java?

1 introduction

Because threads are scarce resources, if they are created and destroyed without limit in high concurrency, they will consume system resources and degrade system stability. So thread pools are designed to solve these problems. Thread pooling improves performance by reusing existing thread resources and reducing the number of threads created and destroyed. It also allows for uniform allocation, tuning, and monitoring.

In Java, you can create thread pools by using the Executors class newFixedThreadPool, newCachedThreadPool, newScheduledThreadPool, or other methods. They all build directly or indirectly from ThreadPoolExecutor, passing in different arguments to achieve different effects. (newScheduledThreadPool in particular overrides part of ThreadPoolExecutor’s logic, Later I will write an article on the source analysis was carried out on the ScheduledThreadPoolExecutor).

1.1 Thread pool parameters

There are seven parameters in ThreadPoolExecutor:

  • CorePoolSize: The number of core threads that will remain alive even if no tasks need to be executed (unless the allowCoreThreadTimeOut parameter is set to true, in which case even core threads will be destroyed by timeout);

  • MaximumPoolSize: the maximum number of threads allowed in the thread pool;

  • KeepAliveTime: Maintains the idle time allowed by the worker thread. If the worker thread waits longer than keepAliveTime, it will be destroyed.

  • Unit: Specifies the keepAliveTime unit, such as timeunit.seconds.

  • WorkQueue: A blocking queue that holds tasks waiting to be executed. Common examples are ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue, etc.

  • ThreadFactory: a threadFactory that provides the ability to create new threads. The default implementation is Executors. DefaultThreadFactory (), namely through the way of new Thread;

  • Handler: If the current blocking queue is full and the current number of threads has exceeded the maximum number of threads, the corresponding rejection policy is executed. There are four types (you can also implement them yourself) :

    • AbortPolicy: the default implementation, can throw RejectedExecutionException directly;
    • CallerRunsPolicy: Executes the task with the caller’s thread;
    • DiscardPolicy: Directly discards the task.
    • DiscardOldestPolicy: Discards the most advanced task in the blocking queue and executes the current task.

The implementation of these four rejection strategies is very simple and will not be illustrated here, so readers can see for themselves.

1.2 Operation Process

ThreadPoolExecutor runs like this:

If using a bounded blocking queue:

If a new task needs to be executed and the number of threads in the current thread pool is smaller than the number of core threads, a core thread is created to execute it. If the number of current threads is greater than the number of core threads, all tasks other than those handled by the core thread are added to the blocking queue for execution. If the queue is full, a new non-core thread is created if the current number of threads is not greater than the maximum number of threads. When the keepAliveTime idle time is reached, the non-core thread is destroyed directly. You just have to decrease to the number of remaining threads to the number of core threads. The difference between core threads and non-core threads is only in determining whether a threshold has been reached: core threads determine the number of core threads, while non-core threads determine the maximum number of threads. There is only one difference. I’ll emphasize this later in the source code. If the current number of threads is greater than the maximum number of threads, the corresponding rejection policy is executed.

If using an unbounded blocking queue:

Compared with bounded blocking queues, unbounded blocking queues do not fail to enqueue tasks unless system resources are exhausted. When a new task arrives and the number of threads in the system is smaller than the number of core threads, a core thread is created to execute it. Once the number of core threads is reached, it does not increase further. If new tasks are added but no idle thread resources are available, the task is directly placed in the blocking queue to wait. If the speed of task creation and processing is very different, the unbounded blocking queue keeps growing rapidly until it runs out of system memory.

1.3 Thread pool status

There are five states in ThreadPoolExecutor:

  • RUNNING: Initial state, in which new tasks can be received and added tasks can be processed;
  • SHUTDOWN: The thread pool changes to the SHUTDOWN state by calling the SHUTDOWN method. At this time, no new tasks are received, but the added tasks can be processed.
  • STOP: The thread pool is stopped by calling the shutdownNow method. In this case, new tasks are not received, added tasks are not processed, and ongoing tasks are interrupted.
  • TIDYING: Enters TIDYING state when all tasks in the thread pool have terminated, the number of tasks is zero, and the blocking queue is empty. A hook method terminated is called. It is an empty implementation that can be overridden by the caller.
  • TREMINATED: indicates the status of the thread pool that is completely terminated. When the thread pool is in TIDYING state, it enters that state after execution of the terminated method.

In ThreadPoolExecutor state is represented by the high three bits in the CTL attribute:

 1 // CTL contains two parts of information: the higher 3 bits indicate the RUNNING status, the lower 29 bits store the number of worker threads, and the initial state is RUNNING
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 / / 29
 4 private static final int COUNT_BITS = Integer.SIZE - 3;
 5 //1 moves 29 bits to the left. Indicates the maximum number of worker threads
 6 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
 7
 8 // CTL = 111
 9 private static final int RUNNING = -1 << COUNT_BITS;
10 // CTL with the highest 3 bits 000 and the lowest 29 bits 0
11 private static final int SHUTDOWN = 0 << COUNT_BITS;
12 // CTL 0 = 0
13 private static final int STOP = 1 << COUNT_BITS;
14 // CTL 0 = 0
15 private static final int TIDYING = 2 << COUNT_BITS;
16 // CTL 0 = 0
17 private static final int TERMINATED = 3 << COUNT_BITS;
18
19 // Get the top three bits of the CTL (all 29 bits are 0), which is to get the running state
20 private static int runStateOf(int c) {
21     return c & ~CAPACITY;
22 }
23
24 // Get the lowest 29 bits of the CTL (the highest three bits are 0), that is, get the number of worker threads
25 private static int workerCountOf(int c) {
26     return c & CAPACITY;
27 }
28
29 // To get the concatenated value of the health state and the number of worker threads
30 private static int ctlOf(int rs, int wc) {
31     return rs | wc;
32 }
33
34 // Check whether CTL is less than the value of the state represented by s
35 private static boolean runStateLessThan(int c, int s) {
36     return c < s;
37 }
38
39 // Check whether CTL is greater than or equal to the value of the state represented by s
40 private static boolean runStateAtLeast(int c, int s) {
41     return c >= s;
42 }
43
44 // Check whether the CTL is RUNNING
45 private static boolean isRunning(int c) {
46     return c < SHUTDOWN;
47 }
Copy the code

1.4 the Worker

Worker is an inner class in ThreadPoolExecutor that encapsulates Worker threads:

 1 private final class Worker
 2         extends AbstractQueuedSynchronizer
 3         implements Runnable {
 4     / /...
 5
 6     // The thread running the Worker
 7     final Thread thread;
 8     // Incoming tasks
 9     Runnable firstTask;
10     // The number of tasks completed by this Worker, used for follow-up statistics and monitoring
11     volatile long completedTasks;
12
13     Worker(Runnable firstTask) {
14         /* 15 The initial state of AQS is set to -1 in order to delay the interruption of the thread until the task actually starts to run. In other words, 16 forbids the interruption of the thread before the task is executed. The thread is aborted during calls to methods like shutdown and shutdownNow, and the tryLock method is called to attempt locking before 17. When set to -1, the tryLock method returns false, so you can't interrupt 18 */
19         setState(-1);
20         this.firstTask = firstTask;
21         this.thread = getThreadFactory().newThread(this);
22     }
23
24     // Since the Worker class implements the Runnable interface, this is where thread.start will end up when called
25     public void run(a) {
26         runWorker(this);
27     }
28
29     / /...
30 }
Copy the code

As you can see from above, the Worker inherits AQS (I’ve written about source code analysis of AQS, ReentrantLock, and blocking queues before, ReentrantLock – AQS source code analysis of conditional queues – do you know how to implement blocking queues in Java? And implements the Runnable interface. Later, when analyzing the source code, we will see that the reason why ReentrantLock is not used as an exclusive lock when running the Worker is that ReentrantLock is a ReentrantLock. When you manually change the number of core threads, such as some setCorePoolSize methods, if the value is smaller than the original value, the extra threads will be interrupted, interrupting the running thread. So using a self-implemented non-reentrantlock instead of a ReentrantLock is to prevent methods like setCorePoolSize from reacquiring lock resources and causing running threads to self-interrupt. All of the above is explained in the Worker class comment:

2 the constructor

 1 /** 2 * ThreadPoolExecutor: 3 * all-argument constructor, which all other constructors will eventually call 4 */
 5 public ThreadPoolExecutor(int corePoolSize,
 6                           int maximumPoolSize,
 7                           long keepAliveTime,
 8                           TimeUnit unit,
 9                           BlockingQueue<Runnable> workQueue,
10                           ThreadFactory threadFactory,
11                           RejectedExecutionHandler handler) {
12     // Invalid parameter verification
13     if (corePoolSize < 0 ||
14             maximumPoolSize <= 0 ||
15             maximumPoolSize < corePoolSize ||
16             keepAliveTime < 0)
17         throw new IllegalArgumentException();
18     // Non-null check
19     if (workQueue == null || threadFactory == null || handler == null)
20         throw new NullPointerException();
21     // If the security manager is not empty, access is granted.
22     this.acc = System.getSecurityManager() == null ?
23             null :
24             AccessController.getContext();
25     this.corePoolSize = corePoolSize;
26     this.maximumPoolSize = maximumPoolSize;
27     this.workQueue = workQueue;
28     // Convert keepAliveTime to nanoseconds
29     this.keepAliveTime = unit.toNanos(keepAliveTime);
30     this.threadFactory = threadFactory;
31     this.handler = handler;
32 }
Copy the code

3 the execute method

 1 /** 2 * ThreadPoolExecutor: 3 */
 4 public void execute(Runnable command) {
 5     // Non-null check
 6     if (command == null)
 7         throw new NullPointerException();
 8     int c = ctl.get();
 9     // If the current number of threads is less than the number of core threads, create a core thread
10     if (workerCountOf(c) < corePoolSize) {
11         if (addWorker(command, true))
12             return;
13         /* 14 if the thread pool is SHUTDOWN or above, or the number of threads exceeds the threshold, get the CTL value again and follow the logic 16 */
17         c = ctl.get();
18     }
19     The thread pool is in the RUNNING state. If the thread pool is in the RUNNING state, the thread pool is in the RUNNING state. The thread pool is in the RUNNING state. Instead of blocking, the offer immediately returns false 23 */
24     if (isRunning(c) && workQueue.offer(command)) {
25         int recheck = ctl.get();
26         If the thread pool is not in the RUNNING state, the thread pool will be deleted and the corresponding rejection policy will be executed. 29 */
30         if(! isRunning(recheck) && remove(command))31             reject(command);
32         /* 33 Also checks to see if the current number of worker threads is zero (perhaps they were destroyed after the last workerCountOf 34 check (line 10) (allowCoreThreadTimeOut set to true)). If 35, create a new non-core thread with an empty task. Note that the addWorker method is passed as an empty task, since task 36 already exists in the blocking queue, so when the Worker executes, The task will be pulled directly from the blocking queue to execute 37, so the meaning here is that in order to keep the thread pool in the RUNNING state, there must be a thread to execute the task 38 */
39         else if (workerCountOf(recheck) == 0)
40             addWorker(null.false);
41     } else if(! addWorker(command,false))
42         /* the thread pool is not in the RUNNING state, or the blocking queue is full, so create a non-core thread to execute. Or if the current number of threads is greater than or equal to the maximum number of threads 45, the corresponding rejection policy 46 */ is executed
47         reject(command);
48 }
49
50 /** 51 * line 30:52 */
53 public boolean remove(Runnable task) {
54     // Block the queue to remove the task
55     boolean removed = workQueue.remove(task);
56     // Decide whether to terminate the thread pool based on the thread pool state
57     tryTerminate(); 
58     return removed;
59 }
60
61 /** 62 * at line 31 and 47:63 */
64 final void reject(Runnable command) {
65     // Execute the logic according to which rejection strategy is used (the code of the specific four rejection strategies is not seen here, but is very simple)
66     handler.rejectedExecution(command, this);
67 }
Copy the code

4 addWorker method

The addWorker method is called when a task is added above:

  1 /** 2 * ThreadPoolExecutor: 3 */
  4 private boolean addWorker(Runnable firstTask, boolean core) {
  5     retry:
  6     for(; ;) {7         int c = ctl.get();
  8         // Get the running status of the current thread pool
  9         int rs = runStateOf(c);
 10 
 11         /* 12 If the current thread pool state is greater than SHUTDOWN, return false to indicate that no new Worker will be added. If firstTask is not null (equivalent to a new task) or the blocking queue is empty 15 (empty means there is no need to create the Worker), return false and do not add new Worker 16 */
 17         if (rs >= SHUTDOWN &&
 18! (rs == SHUTDOWN &&19                         firstTask == null &&
 20! workQueue.isEmpty()))21             return false;
 22 
 23         for(; ;) {24             // Retrieves the number of worker threads in the current thread pool
 25             int wc = workerCountOf(c);
 26             /* 27 <1> If the current number of threads is greater than or equal to the maximum value; 28 <2.1> In the case of core threads, the number of current threads is greater than or equal to the number of core threads. 29 <2.2> If the number of non-core threads is greater than or equal to the maximum number of threads 30, the current number of threads has reached the threshold. If 31 is met, false is returned and no new task 32 */ is added
 33             if (wc >= CAPACITY ||
 34                     wc >= (core ? corePoolSize : maximumPoolSize))
 35                 return false;
 36             /* 37 CAS attempts to apply the CTL +1, i.e. the number of worker threads +1. If successful, the loop is broken and 38 continues at line 58 to execute 39 */
 40             if (compareAndIncrementWorkerCount(c))
 41                 break retry;
 42             // If CAS+1 fails, reread the latest value of the CTL
 43             c = ctl.get();
 44             /* 45 If you find that the running state is not the same as when you first entered the method, 46 indicates that the state has changed during this period, so try again from the beginning 47 */
 48             if(runStateOf(c) ! = rs)49                 continue retry;
 50             /* the state has not changed, but the CAS operation of CTL +1 failed, so repeat 53 */ from line 25, line 52
 54         }
 55     }
 56
 57     // The above loop is mainly for the +1 operation on the CTL, while the following loop is for the creation of the Worker
 58     boolean workerStarted = false;
 59     boolean workerAdded = false;
 60     Worker w = null;
 61     try {
 62         // Create a Worker based on firstTask (as mentioned above, state in AQS starts at -1 to prevent interruption)
 63         w = new Worker(firstTask);
 64         // Each Worker creates a Thread
 65         final Thread t = w.thread;
 66         if(t ! =null) {
 67             final ReentrantLock mainLock = this.mainLock;
 68             / / lock
 69             mainLock.lock();
 70             try {
 71                 // Reobtains the current thread pool running state
 72                 int rs = runStateOf(ctl.get());
 73 
 74                 /* 75 If the thread pool is currently in the RUNNING state or SHUTDOWN state and firstTask 76 is empty (meaning that new tasks are not processed but tasks in the blocking queue), 77 new workers can be added to the workers set 78 */
 79                 if (rs < SHUTDOWN ||
 80                         (rs == SHUTDOWN && firstTask == null)) {
 81                     /* 82 The thread has not started yet, but isAlive returns true, indicating that the thread is faulty. 83 raises an exception 84 */
 85                     if (t.isAlive())
 86                         throw new IllegalThreadStateException();
 87                     // Add the Worker to the workers set (HashSet, because it is locked, HashSet is ok)
 88                     workers.add(w);
 89                     int s = workers.size();
 90                     /* 91 If the number of threads in the current thread pool exceeds largestPoolSize, update the largestPoolSize to 92, which stores the maximum number of threads in the current thread pool
 94                     if (s > largestPoolSize)
 95                         largestPoolSize = s;
 96                     // Created the Worker successfully
 97                     workerAdded = true;
 98                 }
 99             } finally {
100                 / / releases the lock
101                 mainLock.unlock();
102             }
103             if (workerAdded) {
104                 // If the Worker set above succeeds in adding Worker, the thread in Worker is used to start the thread
105                 t.start();
106                 workerStarted = true;
107             }
108         }
109     } finally {
110         if(! workerStarted)111             // If the adding fails, the processing fails
112             addWorkerFailed(w);
113     }
114     return workerStarted;
115 }
116
117 private void addWorkerFailed(Worker w) {
118     final ReentrantLock mainLock = this.mainLock;
119     / / lock
120     mainLock.lock();
121     try {
122         // If the Worker was created successfully, delete it from the workers collection
123         if(w ! =null)
124             workers.remove(w);
125         // execute ctl-1, which uses an infinite loop to ensure that the CAS operation succeeds
126         decrementWorkerCount();
127         // Decide whether to terminate the thread pool based on the thread pool state
128         tryTerminate();
129     } finally {
130         / / releases the lock
131         mainLock.unlock();
132     }
133 }
Copy the code

5 runWorker method

Since the Worker class implements the Runnable interface, calling thread.start will end up calling Worker’s run method:

  1 /** 2 * ThreadPoolExecutor: 3 * this is where the t.start() method is eventually called 4 */
  5 public void run(a) {
  6     runWorker(this);
  7 }
  8
  9 final void runWorker(Worker w) {
 10     // Get the current thread (thread in Worker)
 11     Thread wt = Thread.currentThread();
 12     Runnable task = w.firstTask;
 13     // Clear the firstTask from the Worker as it will be executed next
 14     w.firstTask = null;
 15     /* 16 Because the state of the AQS was initially set to -1 when the Worker was created in order to prevent the thread from being interrupted 17, the unlock method resets the state to 0, meaning that it has entered the runWorker method 18 and can allow the interruption of 19 */
 20     w.unlock();
 21     boolean completedAbruptly = true;
 22     try {
 23         // If the task is not empty, or the task is fetched from the blocking queue
 24         while(task ! =null|| (task = getTask()) ! =null) {
 25             /* 26 lock (note that the Worker is used for the lock, not the ReentrantLock, to ensure that the code below 27 is not re-entranted by the same thread, but can be executed concurrently by different threads) 28 */
 29             w.lock();
 30             /* 31 If the current thread pool state is greater than or equal to STOP, make sure that the current thread also needs to be interrupted (since 32 terminates the thread pool at this point, no new thread can be added); Otherwise, if shutdownNow 33 method is called after the above judgment is not satisfied (note that shutdownNow method is locked by ReentrantLock, while the code goes to 34 where the current Worker is locked, the two locks are not the same, so they can be executed concurrently), Before 35, the state was either RUNNING or SHUTDOWN. After the first runStateAtLeast 36 judgment condition was not met, the shutdownNow method was executed to change the state to STOP. 37 At the same time, the Worker interrupt bit was set. If thread.interrupted () returns true, 38 if the status of the Thread pool has changed to STOP, the Thread will be interrupted. In this case, 39 or even the entire ThreadPoolExecutor interrupt thread does not interrupt. 40 wt.interrupt() sets an interrupt flag, which requires the user to use the run method first via isInterrupted. Whether the following business code should be executed) 42 */
 43             if ((runStateAtLeast(ctl.get(), STOP) ||
 44                     (Thread.interrupted() &&
 45                             runStateAtLeast(ctl.get(), STOP))) &&
 46! wt.isInterrupted())47                 wt.interrupt();
 48             try {
 49                 // Hook method, empty implementation
 50                 beforeExecute(wt, task);
 51                 Throwable thrown = null;
 52                 try {
 53                     // This is a thread specific task.
 54                     task.run();
 55                 } catch (RuntimeException x) {
 56                     thrown = x;
 57                     throw x;
 58                 } catch (Error x) {
 59                     thrown = x;
 60                     throw x;
 61                 } catch (Throwable x) {
 62                     thrown = x;
 63                     throw new Error(x);
 64                 } finally {
 65                     // Hook method, empty implementation
 66                     afterExecute(task, thrown);
 67                 }
 68             } finally {
 69                 // Set task to null and the next task in the blocking queue will be fetched the next time the loop is looped
 70                 task = null;
 71                 // Number of completed tasks +1
 72                 w.completedTasks++;
 73                 / / releases the lock
 74                 w.unlock();
 75             }
 76         }
 77         // The loop executes the above while loop to pick up the task, which means there are no more tasks in the Worker and the blocking queue
 78         completedAbruptly = false;
 79     } finally {
 80         // Put the finishing touches on the Worker
 81         processWorkerExit(w, completedAbruptly);
 82     }
 83 }
 84
 85 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 86     /* 87 completedAbruptly is true if an exception is thrown in the while loop of the runWorker method, then the 88 worker thread does not have -1, Requires -1 (normally -1 on the last call to the getTask method in the while loop) 89 */
 90     if (completedAbruptly)
 91         decrementWorkerCount();
 92
 93     final ReentrantLock mainLock = this.mainLock;
 94     / / lock
 95     mainLock.lock();
 96     try {
 97         // Add the number of tasks completed by all workers for statistical monitoring
 98         completedTaskCount += w.completedTasks;
 99         /* 100 Remove the current Worker (i.e., the current thread) from the workers set and wait for GC 101 The timed flag must be true in the getTask method (if the timed flag is false, it will always block in the getTask 102 take method, and the interrupt wake is not possible because the getTask method will continue to loop in this case). Either the idle core thread timed out needs to be destroyed, or the idle non-core thread timed out needs to be destroyed. Either way, the current thread is 104 to be destroyed at 105 */
106         workers.remove(w);
107     } finally {
108         / / releases the lock
109         mainLock.unlock();
110     }
111
112     // Decide whether to terminate the thread pool based on the thread pool state
113     tryTerminate();
114 
115     int c = ctl.get();
116     // If the thread pool is in the RUNNING or SHUTDOWN state
117     if (runStateLessThan(c, STOP)) {
118         // From the previous analysis, if completedAbruptly is false, there are no more tasks to execute
119         if(! completedAbruptly) {120             // If allowCoreThreadTimeOut is true, min is 0, otherwise it is the number of core threads
121             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
122             /* 123 If the blocking queue is not empty (perhaps there is more data in the blocking queue at this point) and allowCoreThreadTimeOut 124 is true, change the min to 1 125 */
126             if (min == 0 && !workQueue.isEmpty())
127                 min = 1;
128             /* 129 Two cases: 130 <1> If the blocking queue is not empty and allowCoreThreadTimeOut is true, check whether the number of current worker threads is greater than or equal to 1. If allowCoreThreadTimeOut is false, check whether the current number of worker threads is greater than or equal to the number of core threads. If not, the number of working threads is smaller than the number of core threads, so add a non-core thread 134 */
135             if (workerCountOf(c) >= min)
136                 return;
137         }
138         /* 139 Having examined the two cases where completedAbruptly is false, let's examine the third case where completedAbruptly is 140 true. If completedAbruptly is true, an exception is thrown in the while loop of the runWorker method, and a 141 non-core thread is added (although the error task is cleared in the finally clause). However, the afterExecute 142 hook method can be overridden to save the failed task for subsequent processing. From this point of view, adding a non-core thread makes 143 sense. In addition, as previously analyzed, in line 34 of addWorker method, the difference between core thread and non-core thread only lies in the judgment of threshold value, 144 everything else is the same. So it's ok to add a non-core thread here, which is below the threshold anyway) 145 */
146         addWorker(null.false);
147     }
148 }

Copy the code

6 getTask method

As shown above, in line 24 of the code, when the task in the Worker is empty, the task will be taken from the blocking queue, that is, the getTask method will be called:

 1 /** 2 * ThreadPoolExecutor: 3 */
 4 private Runnable getTask(a) {
 5     // the timedOut flag is used to determine whether the poll task timedOut
 6     boolean timedOut = false;
 7 
 8     for(; ;) {9         int c = ctl.get();
10         // Reobtains the current thread pool running state
11         int rs = runStateOf(c);
12 
13         /* 14 if the current thread pool is SHUTDOWN and the blocking queue is empty; Or if the current thread pool state is greater than or equal to STOP 15, the worker thread -1 will be returned null. Because there's no need for 16 quests in either case. After Worker thread -1, the Worker is subsequently removed from the workers collection in the processWorkerExit method waiting for the GC's 17 */
18         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
19             decrementWorkerCount();
20             return null;
21         }
22
23         26 27 Retrieve the number of working threads in the current thread pool 28 */
29         int wc = workerCountOf(c);
30 
31         If allowCoreThreadTimeOut is set to true, an idle core thread should also be destroyed by timeout. Or the current thread count is greater than 34 core threads (this condition indicates that idle non-core threads are to be destroyed. If allowCoreThreadTimeOut is false, 35 will retain a maximum of "core threads passed into the thread pool"). Timed is set to true 36 */
37         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
38
39         /* 40 If the current number of working threads is greater than the maximum number of working threads, the setMaximumPoolSize method may be called to reduce the maximum number of working threads. Line 34 of the addWorker method already determines that the number of threads is greater than the maximum number); 42 timedOut true indicates that this is not the first loop and a poll timeout occurred in the last loop. So in summary what this if condition means is: 43 <1.1> If the number of current worker threads is greater than the maximum number of threads 44 <1.2> or the current thread is idle and needs to be destroyed 45 <2.1> and the current worker thread must have more than one 46 <2.2> or the current blocking queue is empty 47 meet the above two conditions, the worker thread -1, Remove the current redundant thread and return 48 */ directly
49         if ((wc > maximumPoolSize || (timed && timedOut))
50                 && (wc > 1 || workQueue.isEmpty())) {
51             // The difference between this method and the decrementWorkerCount method is that it does not keep trying CAS indefinitely, and returns false on failure
52             if (compareAndDecrementWorkerCount(c))
53                 return null;
54             // If cas-1 fails, proceed to the next loop
55             continue;
56         }
57
58         try {
59             If timed is true, the poll method is timed. If timed is true, the poll method is timed. If timed is true, the poll method is timed. That is, the thread will block until there is more data in the 62 blocking queue. So if timed is false, these worker threads will always be blocked here
64             Runnable r = timed ?
65                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
66                     workQueue.take();
67             if(r ! =null)
68                 // If the task is retrieved, it is returned to the Worker directly
69                 return r;
70             /* 71 A poll timeout has occurred, so set timedOut to true and retry on the next loop 72 (most likely returning null at line 53) 73 */
74             timedOut = true;
75         } catch (InterruptedException retry) {
76             // If an interrupt occurs during blocking, set timedOut to false and enter the next loop again
77             timedOut = false;
78         }
79     }
80     /* The difference between a core thread and a non-core thread is not that there is a core thread attribute in a Worker. Workers are stateless. 82 All workers are the same. This is done by determining whether the number of worker threads is greater than the number of core threads (because 83 will create new non-core threads only when the blocking queue is full, thus making the number of worker threads greater than the number of core threads). If more than, no matter before this thread is the core thread 84 or non-core thread, now I decided that the current thread is "non-core" thread, such as the thread "non-core" free time more than keepAliveTime after 85 is destroyed 86 * /
87 }


Copy the code

7 shutdown method

Shutting down a thread pool is usually called with the shutdown method instead of the shutdownNow method:

 1 /** 2 * ThreadPoolExecutor: 3 */
 4 public void shutdown(a) {
 5     final ReentrantLock mainLock = this.mainLock;
 6     / / lock
 7     mainLock.lock();
 8     try {
 9         If there is a security manager, make sure the caller has permission to close the thread pool.
10         checkShutdownAccess();
11         // Change the thread pool state to SHUTDOWN, which uses an infinite loop to ensure that the CAS operation succeeds
12         advanceRunState(SHUTDOWN);
13         interruptIdleWorkers();
14         // Hook method, empty implementation
15         onShutdown();
16     } finally {
17         / / releases the lock
18         mainLock.unlock();
19     }
20     // Decide whether to terminate the thread pool based on the thread pool state
21     tryTerminate();
22 }
23
24 /**
25  * 第13行代码处:
26  */
27 private void interruptIdleWorkers(a) {
28     // Interrupt all idle threads
29     interruptIdleWorkers(false);
30 }
31
32 private void interruptIdleWorkers(boolean onlyOne) {
33     final ReentrantLock mainLock = this.mainLock;
34     / / lock
35     mainLock.lock();
36     try {
37         for (Worker w : workers) {
38             Thread t = w.thread;
39             if(! t.isInterrupted() && w.tryLock()) {40                 try {
41                     /* 42 If the thread in the current Worker has not been interrupted and the attempt to lock is successful, the interrupt flag of 43 will be reset to true, which means to interrupt the idle Worker 44 */
45                     t.interrupt();
46                 } catch (SecurityException ignore) {
47                 } finally {
48                     // Reset state in AQS to 0 and restore the state before tryLock
49                     w.unlock();
50                 }
51             }
52             if (onlyOne)
53                 // if onlyOne is true, onlyOne interrupt is attempted
54                 break;
55         }
56     } finally {
57         / / releases the lock
58         mainLock.unlock();
59     }
60 }

Copy the code

8 tryTerminate method

In the above implementation you can see that the tryTerminate method is called several times to determine whether the current thread pool should be terminated:

 1 /** 2 * ThreadPoolExecutor: 3 * 4 */
 5 final void tryTerminate(a) {
 6     for(; ;) {7         int c = ctl.get();
 8         <2> The thread pool is TERMINATED in TIDYING or TERMINATED state, and TERMINATED in TERMINATED state. <3> if the current thread pool is SHUTDOWN and the blocking queue is not empty, the state of the terminated thread pool cannot be changed. The state of the terminated thread pool cannot be changed
15         if (isRunning(c) ||
16                 runStateAtLeast(c, TIDYING) ||
17(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))18             return;
19         If the thread pool is SHUTDOWN and the blocking queue is empty, the thread pool can be stopped. If the thread pool is SHUTDOWN and the blocking queue is empty, the thread pool can be stopped. Note that the current thread is not the last thread to execute task 22 (because if the current thread to be destroyed is idle, it will eventually do -1 in the getTask method (throwing an exception while executing will do -1 in the processWorkerExit method 23), That is to say, each should have destroyed the idle thread when finally couldn't take the task will be 1, 24 so if it is found that the current working threads without reduced to 0, means that the current thread is not the last one thread of execution), then it won't end thread pool 25 (the end of a thread pool to the last thread to do). ONLY_ONE is always true, which means that if the current thread was not the last one to perform the task on 26, it would simply interrupt an idle thread (itself) and return 27 */
28         if(workerCountOf(c) ! =0) {
29             interruptIdleWorkers(ONLY_ONE);
30             return;
31         }
32
33         /* the current thread count is 0, which means that the current thread is the last thread to execute the task
37         final ReentrantLock mainLock = this.mainLock;
38         / / lock
39         mainLock.lock();
40         try {
41             //CAS changes the CTL status to TIDYING
42             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
43                 try {
44                     // Hook method, empty implementation
45                     terminated();
46                 } finally {
47                     // Put the thread pool state to terminated after executing the terminated method
48                     ctl.set(ctlOf(TERMINATED, 0));
49                     /* 50 Maybe a thread called awaitTermination before this and blocked, 51, with no timeout and no interrupt. At this point you need to wake up the thread pool 52 */
53                     termination.signalAll();
54                 }
55                 return;
56             }
57         } finally {
58             / / releases the lock
59             mainLock.unlock();
60         }
61         // CAS failed to change the state to TIDYING
62     }
63 }
Copy the code

The longer I work in this industry, the more I feel: great oaks from little acorns grow, it’s a truth! In the application business for too long many bottom things are often easy to ignore, the plan at the beginning of this year is to do a summary of the commonly used JDK source tools, see the end of the year is approaching, by the recent free, hurriedly to make up.

  1. Do you know ArrayList? Describe the difference between remove for foreach and iterator
  2. Have you ever wondered why Internet companies always ask about collections? Let’s talk about the classic data structure HashMap
  3. AQS source in-depth analysis of exclusive mode -ReentrantLock lock features in detail
  4. AQS source code for in-depth analysis of sharing mode – Why PROPAGATE state in AQS?
  5. AQS source code in depth analysis of conditional queue -Java blocking queue is how to achieve?
  6. AQS source code in-depth analysis of the application tool CountDownLatch (creation)
  7. CyclicBarrier: AQS source code analysis tool
  8. ConcurrentHashMap is a bug in Java 8. And there’s more than one! This pit is still relatively large, will focus on the summary behind! (Finished)
  9. ThreadPoolExecutor source code analysis – The implementation process of the Java thread pool is broken, and many people are still confused. (Current article)
  10. ScheduledThreadPoolExecutor source analysis – often timer thread pool focus on how to achieve delay the execution and cycle!
  11. ThreadLocal source code analysis – key summary, memory leak, soft reference weak reference false reference, interview often like to ask, I also like to ask another
  12. Red black tree TreeMap, LinkedHashMap
  13. An in-depth understanding of the ordered and threaded Map container ConcurrentSkipListMap
  14. LinkedList (Not sure if you want to write it, if you have time, it depends on the project)
  15. 1T data quicksort! Summary of ten classical sorting algorithms

Each summary is the knowledge of the degree of examination, technology is not easy, every day a little better, with everyone.

In addition, the author’s public account: Geek time, there are more wonderful articles, interested students, you can pay attention to