The premise
I have planned to read the source code implementation of JUC thread pool ThreadPoolExecutor for a long time, but I have not had time to compile the source code analysis article due to the busy recently. As mentioned earlier in the analysis of the Future of extending the thread pool to implement callbacks, concurrency master Doug Lea designed the top-level interface Executor of thread pool ThreadPoolExecutor to submit tasks with only one stateless execution method:
public interface Executor {
void execute(Runnable command);
}
ExecutorService provides a lot of extension methods and the bottom layer is basically based on the Executor#execute() method. This paper focuses on the analysis of the implementation of ThreadPoolExecutor#execute(). The author will conduct detailed analysis from the implementation principle, source code implementation and other perspectives combined with simplified examples. The source code for ThreadPoolExecutor is essentially unchanged from JDK8 to JDK11, which was used at the time of writing this article.
The principle of ThreadPoolExecutor
AbstractQueuedSynchronizer ThreadPoolExecutor inside to use JUC synchronizer framework (commonly known as AQS), a large number of operations, the CAS operation. ThreadPoolExecutor provides several important features: fixed active threads (core threads), extra threads (the pool size of the thread — the number of additional threads created in the core thread, hereafter referred to as non-core threads), task queues, and denial policies.
JUC Synchronizer Framework
The JUC synchronizer framework is used in ThreadPoolExecutor for four purposes:
- Global lock
mainLock
Member property is a reentrant lockReentrantLock
Type, mainly used to access worker threadsWorker
Collect and perform lock operations when recording statistics. - Condition variables,
termination
.Condition
Type, mainly used for thread terminationsawaitTermination()
Method with a duration block. - Task queue
workQueue
.BlockingQueue
Type, task queue, which holds the tasks to be executed. - Worker thread, inner class
Worker
Type is the actual worker thread object in the thread pool.
About AQS before I wrote an article on the relevant source code analysis, JUC synchronizer framework AbstractQueuedSynchronizer source graphic analysis.
Core thread
Here we refer to the implementation of ThreadPoolExecutor and simplify it. To implement a thread pool with only core threads, the requirements are as follows:
- Leave aside for the moment the handling of task execution exceptions.
- The task queue is unbounded.
- The thread pool capacity is fixed as the number of core threads.
- Leave the rejection strategy aside for the moment.
public class CoreThreadPool implements Executor { private BlockingQueue<Runnable> workQueue; private static final AtomicInteger COUNTER = new AtomicInteger(); private int coreSize; private int threadCount = 0; public CoreThreadPool(int coreSize) { this.coreSize = coreSize; this.workQueue = new LinkedBlockingQueue<>(); } @Override public void execute(Runnable command) { if (++threadCount <= coreSize) { new Worker(command).start(); } else { try { workQueue.put(command); } catch (InterruptedException e) { throw new IllegalStateException(e); } } } private class Worker extends Thread { private Runnable firstTask; public Worker(Runnable runnable) { super(String.format("Worker-%d", COUNTER.getAndIncrement())); this.firstTask = runnable; } @Override public void run() { Runnable task = this.firstTask; while (null ! = task || null ! = (task = getTask())) { try { task.run(); } finally { task = null; } } } } private Runnable getTask() { try { return workQueue.take(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } public static void main(String[] args) throws Exception { CoreThreadPool pool = new CoreThreadPool(5); IntStream.range(0, 10) .forEach(i -> pool.execute(() -> System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i)))); Thread.sleep(Integer.MAX_VALUE); }}
The result of a run is as follows:
Thread:Worker-0,value:0
Thread:Worker-3,value:3
Thread:Worker-2,value:2
Thread:Worker-1,value:1
Thread:Worker-4,value:4
Thread:Worker-1,value:5
Thread:Worker-2,value:8
Thread:Worker-4,value:7
Thread:Worker-0,value:6
Thread:Worker-3,value:9
This thread pool is designed so that the core thread is created lazily and blocks the take() method of the task queue if the thread is idle. The same is true for ThreadPoolExecutor. However, if KeepAliveTime is used and core thread timeout is allowed (allowCoreThreadTimeout is set to true) then BlockingQueue#poll(KeepAliveTime) is used instead of permanent blocking.
Other Additional Features
When you build a ThreadPoolExecutor instance, you define MaximumPoolSize (the maximum number of threads in the thread pool) and CorePoolSize (the number of core threads). When the task queue is a bounded blocking queue, the core thread is full, and the task queue is full, an attempt is made to create additional MaximumPoolSize-corePoolSize threads to execute the newly committed task. The two main additional functions implemented here by ThreadPoolExecutor are:
- A non-core thread is created to execute a task under certain conditions. The collection cycle of a non-core thread (the end of the thread’s life cycle) is
keepAliveTime
, the thread life cycle ends when the next time a task is fetched from the task queue and the time it lives exceedskeepAliveTime
. - Provides a rejection policy that triggers a rejection policy when the core thread is full, the task queue is full, and the non-core thread is full.
Source code analysis
The key attributes of the thread pool are examined first, followed by its state control, and finally the focus is on the ThreadPoolExecutor#execute() method.
Key attributes
Public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger CTL = new; private final AtomicInteger CTL = new AtomicInteger(ctlOf(RUNNING, 0)); Private final BlockingQueue<Runnable> WorkQueue; Private final hashSet <Worker> workers = new hashSet <Worker> (); private final hashSet <Worker> workers = new hashSet <Worker> (); Private final ReentrantLock mainLock = new ReentrantLock(); Private final Condition termination = MainLock. NewCondition (); private final Condition termination = MainLock. Private int LargestPoolSize; Private long completedTaskCount; private long completedTaskCount; Private volatile threadFactory; private volatile threadFactory; private volatile threadFactory; Private volatile rejectedExecutionHandler handler private volatile rejectedExecutionHandler handler private volatile rejectedExecutionHandler handler Private volatile Long KeepAliveTime; private volatile Long KeepAliveTime; private volatile Long KeepAliveTime; Private volatile Boolean allowCoreThreadTimeout (); private volatile Boolean allowCoreThreadTimeout (); Private volatile int corePoolSize; Private volatile int MaximumPoolSize; // Sort out other code}
Let’s look at the constructor with the longest argument list:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
You can customize the number of core threads, thread pool capacity (maximum number of threads), idle thread waiting task cycle, task queue, thread factory, rejection policy. The following is a brief analysis of the meaning and function of each parameter:
- CorePoolSize: int type, number of core threads.
- MaximumPoolSize: Type int. Maximum number of threads, which is the size of the thread pool.
- KeepAliveTime: Long, the idle wait time of a thread, also related to the life of the worker thread, as discussed below.
unit
:TimeUnit
Type,keepAliveTime
The time unit of the parameter, actuallykeepAliveTime
It will eventually convert to nanoseconds.workQueue
:BlockingQueue
Type, waiting queue or task queue.threadFactory
:ThreadFactory
Type, Thread Factory, used to create worker threads (both core and non-core threads), used by defaultExecutors.defaultThreadFactory()
As an instance of a built-in thread factory, a custom thread factory is generally the best way to keep track of worker threads.handler
:-
RejectedExecutionHandler
A rejection policy is executed when the blocking queue is full, there are no free threads (both core and non-core threads), and the task is still to be submitted. Four built-in rejection policy implementations are provided:
AbortPolicy
: direct rejection policy, that is, will not execute the task, directly throwRejectedExecutionException
, this isDefault rejection policy.DiscardPolicy
: Discard policy, that is, simply ignore the submitted task (in general, the empty implementation).DiscardOldestPolicy
: Discard the oldest quest strategy, i.e. Passpoll()
Method fetches the task in the queue header and executes the currently committed task.-
CelerRunsPolicy: The caller executes a policy whereby the thread currently calling Executor#execute() directly invokes the task Runnable#run(). This policy is usually chosen to avoid task loss, but from a practical point of view, the original intent of the asynchronous invocation degenerates into a synchronous invocation.
State control
The state control mainly revolves around the atomic integer member variable CTL:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
Private static int runStateOf(int c) {return c & ~COUNT_MASK; } private static int WorkCountOf (int c) {return c & Count_Mask; }
/ / by running the state and the value of the worker thread count CTL or operation private static int ctlOf (int the rs, int wc) {return rs | wc. }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/ / the CAS operation thread by the increase in the number one private Boolean compareAndIncrementWorkerCount (int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/ / CAS number of threads to reduce 1 private Boolean compareAndDecrementWorkerCount (int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
Private void DecrementWorkerCount () {private void DecrementWorkerCount () {
ctl.addAndGet(-1);
}
Next we look at the thread pool state variable. The length of the upper number of bits of the worker thread is' COUNT_BITS ', and its value is' integer.size-3 ', which is the positive Integer 29: > We know that the Integer wrapper type Integer has an instance size of 4 bytes, 32 bits in total, so there are 32 bits for 0 or 1. The ThreadPoolExecutor implementation uses a 32-bit integer wrapper type to hold the number of worker threads and the state of the thread pool. The low 29 bits are used to store the number of worker threads, and the high 3 bits are used to store the state of the thread pool, so the state of the thread pool can only be 2^3 at most. The maximum number of worker threads is 2^29-1, exceeding 500 million, and this number is not considered to exceed the limit in a short time. Next, look at the worker thread upper number mask 'COUNT_MASK', its value is' (1 < COUNT_BITS) -l ', that is, 1 moves to the left 29 bits, minus 1, if 32 bits are completed, its bit view is as follows:! [](https://upload-images.jianshu.io/upload_images/15462057-7779f5276218b4db.png? ImageMogr2 / Auto-Orient /strip% 7cImageView2/2 /w/1240) The status constants of the thread pool are as follows:
/ / – 1 complement for: 111-11111111/111111111111111111111 / left after 29:111-00000000/000000000000000000000 / decimal value is: Private static final int RUNNING = -1 << COUNT_BITS; private static final int RUNNING = -1 << COUNT_BITS;
The composition of the control variable 'CTL' is obtained from the thread pool running state 'rs' and the number of worker threads' wc' by ** or operation ** :
/ / rs = RUNNING value is: 111-00000000 / / wc has a value of 000000000000000000000 0:00 | 0-00000000000000000000000000000 / / rs wc results as follows: 111-00000000000000000000000000000 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int ctlOf(int rs, int wc) {
return rs | wc;
}
So how do we extract the high 3 bit thread pool state from 'CTL'? The 'runStateOf()' method provided in the source code above is to extract the running state:
/ / to take the COUNT_MASK (~ COUNT_MASK), get: 000000000000000000000 / / CTL 111-00000000 bitmap features are: XXX – do with yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy / / two arithmetic can get high three XXX private static int runStateOf (int c) {
return c & ~COUNT_MASK;
}
Similarly, remove the low number of worker threads 29 only need to put the ` CTL ` and ` COUNT_MASK ` (` 000-11111111 111111111111111111111 `) do with operation * * * * at a time. If the number of worker threads is 0, summarize the running state constant of the thread pool:! [](https://upload-images.jianshu.io/upload_images/15462057-b513d5f0cac42860.png? ImageMogr2 / Auto-Orient /strip% 7cImageView2/2 /w/1240) There is a special trick here, because the running state value is in the higher 3 bits, so you can directly use the decimal-value (** can even ignore the lower 29 bits, directly use 'CTL' for comparison), Or use 'CTL' to compare the thread pool state constant **) to compare and determine the state of the thread pool: > worker thread count is zero: Running (-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(TERMINATED(TERMINATED(-536870912) < TIDYING(TERMINATED(TERMINATED)
Private static Boolean runStatelessthan (int c, int s) {private static Boolean runStatelessthan (int c, int s) {
return c < s;
}
Private static Boolean runStatateLeast (int c, int s) {// static Boolean runStatateLeast (int c, int s) {
return c >= s;
}
Private static Boolean isRunning(int c) {public static Boolean isRunning(int c) {public static Boolean isRunning(int c) {public static Boolean isRunning(int c) {
return c < SHUTDOWN;
}
Finally, the jump diagram of thread pool state:! [](https://upload-images.jianshu.io/upload_images/15462057-b53e2c5417c0015d.png? ImageMogr2 / auto - received/strip % 7 cimageview2 > PS / 2 / w / 1240) : There are many intermediate variables in the thread pool source code with a simple single letter, such as C is CTL, WC is worker count, RS is running status. ** ThreadPoolExecutor#execute() ** ThreadPoolExecutor#execute() ** ThreadPoolExecutor#execute()
Public void execute(Runnable command) {public void execute(Runnable command) {
If (command == null) throw new NullPointerException(); Int c = CTL. Get (); // If the current number of worker threads is less than the number of core threads, If (workerCountOf(c) < CorePoolSize) {if (addWorker(command, true)) // return if (addWorker(command, true)); C c = CTL. Get (); c = CTL. Get (); } / / go to the instructions to create a new core thread failure, which is the current working thread number greater than or equal to corePoolSize / / whether the thread pool in running state, If (IsRunning (c) &&WorkQueue.offer (command)) {int recheck = ctl.get(); If the thread pool is not in the running state, the current task will be removed from the task queue. If the thread pool is not in the running state, the current task will be removed from the task queue. IsRunning (recheck) && remove(command)) // Invoke the reject policy processing task - returns reject(command); // Go to the "else if" branch below, stating the following premise: // Incoming tasks may fail to be removed from the task queue (the only possible way to remove the task is if the task was already executed) // If the current number of worker threads is 0, the current number of worker threads is 0. // If the number of pre-worker threads is not zero, it should be the last else branch, but it can do nothing. Else if (WorkerCountof (recheck) == 0) addWorker(null, false); } // The following premise is illustrated here: // The total number of worker threads in the thread pool is already greater than or equal to CorePoolSize. // 1. The thread pool may not be RUNNING If a task fails to be placed on the task queue, then an attempt is made to create a non-core thread for the task to execute. AddWorker (command, false)) // Call the reject policy processing task - return reject(command);
}
1. If the total number of current worker threads is less than 'corePoolSize', the core thread will be created directly to execute the task (the task instance will be passed in directly to construct the worker thread instance). 2. If the current total number of worker threads is greater than or equal to 'corePoolSize', determine whether the thread pool is in running state, and try to use non-blocking method to put tasks into the task queue. Here, the thread pool running state will be checked twice. A non-core thread is created and the task object passed in is NULL. 3. If the task queue fails to be placed (the task queue is full), an attempt will be made to create a non-core thread to pass the task instance to execute. 4. If the creation of a non-core thread fails, the task needs to be rejected and a rejection policy is called to process the task. ** Here is a puzzle point ** : why do we need to double check the running state of the thread pool, the current number of worker threads is 0, try to create a non-core thread and pass in the task object is NULL? If a task is successfully queued, we need to double check whether we need to add a worker thread (because all surviving worker threads may have died after the last check) or whether the thread pool has been shutdown while executing the current method. So we need to double check the status of the thread pool, remove the task from the task queue if necessary or create a new worker thread if no worker thread is available. The task submission process looks like this from the caller's point of view:! [](https://upload-images.jianshu.io/upload_images/15462057-3ed4dbff7795890b.png? ImageMogr2 /auto-orient/strip% 7cImageView2/2 /w/1240) ** 'AddWorker (Runnable FirstTask) **' Boolean addWorker(Runnable FirstTask) The first argument to the Boolean core) 'method can be used to pass in the task instance directly, and the second argument is used to identify whether the worker thread to be created is a core thread. The method source is as follows:
/ / add worker threads, if return false shows no new create a worker thread, if return true success thread that create and start work private Boolean addWorker (Runnable firstTask, Boolean core) {
Retry: // Note that this is an infinite loop -- outermost loop for (int c = ctl.get();;) {// This is a very complex condition, so first split the and (&&) condition: // 1. The thread pool state should be at least SHUTDOWN, i.e. RS >= SHUTDOWN(0) // 2. If the thread pool state is shutdown, no new tasks will be allowed. If the thread pool state is shutdown, no new tasks will be allowed. If the thread pool state is shutdown, no new tasks will be allowed. If (runStatateLeast (c, SHUTDOWN) && (runStatateLeast (c, SHUTDOWN), no new thread should be added if the state has reached STOP, if the incoming task is not empty, or if the task queue is empty (there are no backlogs) STOP) || firstTask ! = null || workQueue.isEmpty())) return false; // Note that this is also an infinite loop - two layer loop for (;;) {// The number of worker threads is retrieved for each iteration of the loop. If the incoming core is true, say you're going to create the core thread, through the wc and corePoolSize judgment, if the wc > = corePoolSize, said return false to create core thread failure / / 1. Wc = maximumPoolSize; wc = maximumPoolSize; wc = maximumPoolSize; wc = maximumPoolSize; If (WorkerCountOf (c) >= ((core? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; / / success by CAS update working threads wc, break to the outermost loop if (compareAndIncrementWorkerCount (c)) break retry. C = CTL.get (); // CTL.get (); // CTL.get (); If (runstatatatleast (c, SHUTDOWN)) continue retry; if (runstatatatleast (c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; // else CAS failed due to workerCount change; Retry inner loop}} Boolean WorkerStarted = false; Boolean WorkerAdded = false; // Indicates whether the worker thread was created successfully. Worker w = null; Try {// firstTask creates a Worker (); // firstTask creates a Worker (); // firstTask creates a Worker (); W = new Worker(FirstTask); W = new Worker(FirstTask); final Thread t = w.thread; if (t ! = null) {final ReentrantLock mainLock = this.mainLock; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); The ThreadFactory thread has been SHUTDOWN, and the thread pool has been SHUTDOWN. The thread pool has been SHUTDOWN, and the thread pool has been SHUTDOWN. If the thread pool status is still RUNNING, it is only necessary to determine whether the thread instance is alive, add it to the Worker thread collection and start a new Worker // 2. If the state of the thread pool is less than STOP (i.e., Running or Shutdown), and the task instance firstTask is NULL, then the Worker should be added to the set of Worker threads and a new Worker should be started. If the task instance firstTask is not NULL, it will not be added to the Worker thread collection and will not start a new Worker. This Worker is likely to succeed the next round of GC object is garbage collected the if (set (c) | | (runStateLessThan (c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // Add the created worker instance to the worker thread set Workers. Add (w); int s = workers.size(); If (s > largestPoolSize) largestPoolSize = s; The thread-start () method is called to start a real instance of the thread (workerAdded = true). } } finally { mainLock.unlock(); } // If the Worker thread is added successfully, call the Thread#start() method of the Worker internal thread instance T to start the real thread instance if (WorkerAdded) {t.art (); WorkerStarted = true; workerStarted = true; }}} finally {// The thread failed to start. Remove the corresponding Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted;
}
Worker private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); Try {// remove if (w! = null) workers.remove(w); // DecrementWorkerCount (); // Try to terminate the thread pool tryTerminate() based on state; } finally { mainLock.unlock(); }
}
I found out that 'Doug Lea' likes complex conditional judgments very much and doesn't like curly braces for single-line complex judgments, as is common in many of his libraries:
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || firstTask ! = null || workQueue.isEmpty())) return false;
/ /… Boolean atleastShutdown = runStatateLeast (c, Shutdown); # rs >= SHUTDOWN(0) boolean atLeastStop = runStateAtLeast(c, STOP) || firstTask ! = null || workQueue.isEmpty(); if (atLeastShutdown && atLeastStop){ return false; }
Note that when the 'Worker' instance is created, an instance of the Java Thread 'Thread' is created in its constructor using the 'ThreadFactory'. The 'Worker' will be checked twice to see if the 'Worker' instance needs to be added to the set of 'workers' and if the' Worker 'instance needs to be started. Only when the' Worker 'instance instance is started, the' Worker 'will actually start working. Otherwise it is just a useless temporary object. 'Worker' itself also implements the 'Runnable' interface, which can be regarded as a 'Runnable' adapter. * * * * Worker Worker thread inner class source code analysis each concrete work threads in thread pool is packing for the inner class ` Worker ` instance, ` Worker ` inheritance in ` AbstractQueuedSynchronizer (AQS) `, implements the ` Runnable ` interface:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; // Save the Thread instance created by the ThreadFactory. If the ThreadFactory Thread fails, the final Thread Thread is null. // Save the incoming Runnable task instance Runnable FirstTask; Volatile long completedTasks; // volatile long completedTasks; // Disallow thread interruptions until the runWorker() method executes setState(-1); // Disallow thread interruptions until the runWorker() method executes setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; / / create a thread through ThreadFactory instance, pay attention to the Worker instance itself as a Runnable is used to create a new thread instance. This thread = getThreadFactory () newThread (this); } public void run() {runWorker(this);} public void run() {runWorker(this);} public void run(); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. // Protected Boolean isHeldExclusively() {return getState()! = 0; } // Attempts to retrieve the resource in exclusive mode, where no variables are passed in, CAS directly determines whether the update from 0 to 1 is successful or not. Protected Boolean tryAcquire(int unused) {if (compareAndSetState(0, 0); 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } in exclusive mode, set state to 0 protected Boolean tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0); return true; } public void Lock () {acquire(1); } public Boolean tryLock() {return tryAcquire(1); } // Unlock public void unlock() {Release (1); } public Boolean isLocked() {return isheldUser (); } // interrupt the Thread after startup. Note that the Thread instance is determined to be false with the interrupt flag. Void interruptifStarted () {Thread t; if (getState() >= 0 && (t = thread) ! = null && ! t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
}
The logic in the constructor of 'Worker' is very important. The instance of 'Thread' created by 'ThreadFactory' is passed to the instance of 'Worker' at the same time. Because 'Worker' itself implements' Runnable ', it can be submitted to the Thread for execution as a task. As long as the 'Worker' holds the thread instance 'w' calls the 'Thread#start()' method, the 'Worker#run()' can be executed at the appropriate time. The logic is simplified as follows:
// createWorker Worker = createWorker(); ThreadFactory = getThreadFactory(); threadFactory = getThreadFactory(); // Worker constructor Thread Thread = threadFactory.newThread(Worker); // Start Thread.start () in the addWorker() method;
'Worker' inherits from 'AQS', and the exclusive mode of 'AQS' is used here. A trick is to set the resource (state) of 'AQS' to -1 by 'setState(-1)' when constructing 'Worker'. This is because the default value of 'state' in 'AQS' when the 'Worker' instance was created is 0. The thread has not yet started and cannot be interrupted at this time. See the 'Worker#interruptIfStarted()' method. The two methods covering 'AQS' in 'Worker', 'tryAcquire()' and 'tryRelease()', do not judge external incoming variables. The former directly 'CAS(0,1)' and the latter directly 'setState(0)'. Moving on to the core method 'ThreadPoolExecutor#runWorker()' :
final void runWorker(Worker w) {
WT = Thread.currentThread(); // Get the currentThread, which is actually the same Thread as the Worker's Thread instance. Runnable task = w.irsttask; // Get the task object that was passed in during initialization held in the Worker. // Set the task object passed in during initialization held in Worker to null; / / as a result of the Worker in AQS upon initial state is set to 1, here to do a first unlock the state update to 0, allows the thread interrupt w.u nlock (); Boolean completedAbruptly = true; // allow interrupts // log whether the thread is terminated because of a user exception. // getTask() is not null when the task object is initialized, or the task is not null when the task is retrieved from the task queue (the task is updated to the temporary variable task). If getTask() returns to NULL, the thread will break out of the loop and terminate while (task! = null || (task = getTask()) ! = null) {Worker (Worker) {Worker (Worker) {Worker (Worker) {Worker (Worker); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while If the thread pool is being stopped (that is, changing from the RUNNING or SHUTDOWN state to the STOP state), make sure the current worker thread is in an interrupted state otherwise, To ensure that the current Thread is not interrupted status if ((runStateAtLeast (CTL) get (), STOP) | | (Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP))) &&! wt.isInterrupted()) wt.interrupt(); BeforeExecute (wt, task); try {// beforeExecute(wt, task); try { task.run(); // hook method, after task execution - normally afterExecute(task, null); } catch (Throwable ex) {// After Execute(task, ex); throw ex; }} finally {// Empty the task temporary variable. This is important, otherwise the while will execute the same task task = null; // The total number of tasks completed by the Worker w.completedTasks++; // The Worker unlocks the resource, which is essentially the AQS unlocks the resource, and sets the state to 0 W. Unlock (); }} // The thread exits completedAbruptly = false when getTask() returns null; } finally {// ProcessWorkerExit (w, light);} finally {// ProcessWorkerExit (w, light); }
}
Here's a look at the code that determines the interrupt status of the current worker thread:
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();
Rs.> = STOP(1) Boolean atLeastStop = runStatatEastLeast (ctl.get(), STOP); // Check if the thread pool state is at least STOP, And judge the current Thread state of disruption and to empty the current Thread state of interrupt Boolean interruptedAndAtLeastStop = Thread. Interrupted () && runStateAtLeast (CTL) get (), STOP); if (atLeastStop || interruptedAndAtLeastStop && ! wt.isInterrupted()){
wt.interrupt();
}
The 'Thread.interrupted()' method gets the interrupted status of the Thread and clears the interrupted status. This method is called because it is possible to call 'shutdownNow()' while executing the 'if' logic above. There is also logic to interrupt all 'Worker' threads in 'shutdownNow()' method. However, since all 'workers' thread interrupts are traversal in' shutdownNow() 'method, it may not be possible to interrupt the task before it is submitted to' Worker 'for execution in time. So the interrupt logic is executed inside the 'Worker', which is the logic of the 'if' block. Also note that the 'STOP' state will reject all newly submitted tasks, will no longer execute tasks in the task queue, and will interrupt all 'Worker' threads. That is, ** even if the task Runnable has been pulled out of the first half of the logic in 'RunWorker ()', it may be interrupted before it reaches the point where its Runnable#run() is called **. If the logic to enter the 'if' block happens and the 'shutdownNow()' method is called externally, then the thread interrupt status will be determined in the 'if' logic and reset. The 'interruptWorkers()' called in the 'shutdownNow()' method would not have caused a second interrupt thread (which would cause an exception) due to a problem with the interrupt status determination. To summarize the core process of the 'runWorker()' method above: 1. The 'Worker' performs an unlocking operation first, which is used to remove the uninterruptible state. 2. Get the task from the task queue by calling the 'getTask()' method through the 'while' loop (of course, the first round of the loop may also be an externally passed FirstTask task instance). 3. If the thread pool changes to the 'STOP' state, make sure that the worker thread is in the interrupt state and handle the interrupt. Otherwise, make sure that the worker thread is not in the interrupt state. 4. Execute the task instance 'runnal# run()' method. Before Execute() 'and' afterExecute() 'hooks are called before and after the task instance executes (including normal and abnormal execution), respectively. 5. When the 'while' loop breaks out, the 'runWorker()' method ends and the worker thread's life cycle ends (the 'Worker#run()' life cycle ends). The 'processWorkerExit()' is called to handle the subsequent work after the worker thread exits. ! [](https://upload-images.jianshu.io/upload_images/15462057-7a5241c2da3c3c6b.png? ImageMogr2 / auto - received/strip % 7 cimageview2/2 / w / 1240) # # # * * write * * welcome to my public attention in the last number, calm, such as code * * * * 】 【 vast Java related articles, learning materials will be updated in the inside, Organized information will also be placed in it.