preface
Hello everyone, I am more of the small white hard, we know whether the interview or job, will be in contact with the thread pool, pool, the knowledge of the interviewer to ask today I bring you a look at the thread pool of source code, while watching the source code is a very dull thing, but you don’t, someone, who also not content to be optimized!!! Follow me, roll it up!!
Inheritance system
The thread pool inheritance architecture is shown below
Thread pool state
Thread pool state transitions are shown below
-
The thread pool is created first and is in a normal RUNNING state, RUNNING, before the shutdown method is called. In the RUNNING state, your submitted tasks (by calling the Submit or execute methods) will be executed by the thread pool. In the RUNNING state, the thread pool can accept new submitted tasks and also process tasks in the blocking queue.
-
When the RUNNING thread pool calls shutdown, the state of the pool changes to shutdown. If you submit a new task (call submit or execute), the pool will reject it. But the thread pool in SHUTDOWN state has a special point: it does not exit all threads immediately, but exits only after all the previous tasks in the queue have been completed.
-
A thread pool in the STOP state cannot accept newly submitted tasks or process tasks in the blocking queue, interrupting the thread that is working on the task. When the thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow method will put the thread pool into that state.
-
The number of threads in the thread pool when all tasks have terminated
workCount
for0
“, the thread pool will enterTIDYING
state -
The last thread to exit (that is, after executing the last task in the task queue) changes the state of the thread pool to
TIDYING
And then callterminated
Method, the thread pool becomesTERMINATED
The state.
Rejection policies
The four rejection strategies are as follows:
AbortPolicy
: Throws an exception, default (more common)CallerRunsPolicy
: hands off to the thread from which the thread pool call is madeDiscardPolicy
: Directly discards the taskDiscardOldestPolicy
: Discards the oldest task in the queue and continues the current task to the thread pool
attribute
// High 3 bits: indicates the current running state of the thread pool. Low (29 bits) : indicates the number of threads in the current thread pool
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// indicates that in CTL, the low COUNT_BITS bit is used to store the current number of threads. The value of COUNT_BITS is 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// The maximum value that can be expressed by the low COUNT_BITS bit. 000 111111111111111111 => 500 million.
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// The value of -1 is the same as the value of -1. The value of -1 is the same as the value of -1
//111 000000000000000000 converted to an integer, actually a negative number
private static final int RUNNING = -1 << COUNT_BITS;
/ / 000 000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
/ / 001 000000000000000000
private static final int STOP = 1 << COUNT_BITS;
/ / 010 000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
/ / 011 000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Task queue. When the number of threads in the thread pool reaches the core number of threads, the submitted task will be directly submitted to the workQueue
//workQueue instanceOf ArrayBrokingQueue LinkedBrokingQueue Synchronization queue
private final BlockingQueue<Runnable> workQueue;
// Thread pool global lock. MainLock is required when adding worker (thread) and when reducing worker (thread). MainLock is also required when changing thread pool running state.
private final ReentrantLock mainLock = new ReentrantLock();
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */
// The thread pool where the worker->thread is actually stored.
private final HashSet<Worker> workers = new HashSet<Worker>();
/** * Wait condition to support awaitTermination */
// When an external thread calls awaitTermination(), the external thread waits until the current thread pool state is Termination.
// How is waiting implemented? The external Thread is wrapped as a waitNode and placed in the Condition queue. The waitNode.Thread is the external Thread and will be park (WAITING).
// When the thread pool state changes to Termination, these threads are woken up. With termination. SignalAll (), after wake up these threads will enter the blocking queue and the header will preempt the mainLock.
// The preempted thread continues to execute awaitTermination(). These threads, at the end, will execute normally.
// simple understanding: termination. Await () will block the thread here.
// termination. SignalAll () will wake up the threads blocking here in turn
private final Condition termination = mainLock.newCondition();
/** * Tracks largest attained pool size. Accessed only under * mainLock. */
// Record the maximum number of threads in the thread pool lifetime
private int largestPoolSize;
/** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. */
// Records the total number of tasks completed by the thread pool. When the worker (thread) exits, the tasks completed by the worker will be accumulated to completedTaskCount
private long completedTaskCount;
// Thread factory is used when creating threads by using Executors. NewFix... newCache... When creating a thread pool, use DefaultThreadFactory
// It is generally not recommended to use the Default thread pool
private volatile ThreadFactory threadFactory;
/** * Handler called when saturated or shutdown in execute. */
// The juC package provides 4 methods. The default is Abort.. The way an exception is thrown.
private volatile RejectedExecutionHandler handler;
When allowCoreThreadTimeOut == false, the number of threads in the core thread count is maintained, and the amount beyond the core thread count is reclaimed.
//allowCoreThreadTimeOut == true The number of threads in the core that are idle will also be reclaimed.
private volatile long keepAliveTime;
/** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */
// Controls whether threads within the core thread count can be reclaimed. True does, false does not.
private volatile boolean allowCoreThreadTimeOut;
/** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */
// Limit the number of core threads.
private volatile int corePoolSize;
/** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */
// Maximum number of threads in the thread pool.
private volatile int maximumPoolSize;
/** * The default rejected execution handler */
AbortPolicy throws an exception.
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
Copy the code
-
CTL: high 3 bits: indicates the current running state of the thread pool (runState), minus the high 3 bits (29 bits) : indicates the number of threads in the current thread pool (workerCount).
-
COUNT_BITS: The value is 29(integer.size -3 = 32-3 = 29), indicating that in CTL, the lower COUNT_BITS bit is the one used to store the current number of threads.
-
CAPACITY: The maximum value of the low COUNT_BITS bit. 000 111111111111111111 => 500 million.
-
RUNNING: Indicates that the thread pool is RUNNING properly. -1 is 32 1’s, moved 29 bits to the left, and filled with zeros. -1 << COUNT_BITS is left with three 1’s and 29 0’s (111 000000000000000000).
-
0 << COUNT_BITS, that is 000000000000000000…. That’s 0
-
STOP: The thread pool is stopped. In the same way, 1 << COUNT_BITS, that is, 001 000000000000000000…..
-
TIDYING: indicates that the thread pool is TIDYING. Similarly, 2 << COUNT_BITS, that is, 010 000000000000000000…..
-
TERMINATED: it means the thread pool in the TERMINATED state, in the same way 3 < < COUNT_BITS, namely 011000000000000000000…
-
WorkQueue: A task queue. When the number of threads in the thread pool reaches the core number, the task will be submitted directly to the workQueue.
-
MainLock: global lock of the thread pool. MainLock is required when adding workers and reducing workers and when changing the running status of the thread pool.
-
Workers: Threads are encapsulated as workers in the thread pool. Workers really store all workers, that is, all threads.
-
LargestPoolSize: records the maximum number of threads in the thread pool life cycle.
-
CompletedTaskCount: Records the total number of tasks completed by the thread pool. When the worker (thread) exits, the tasks completed by the worker will be accumulated to completedTaskCount
-
ThreadFactory: a threadFactory is used when creating threads. The default is not recommended. It is recommended to implement threadFactory yourself
-
Handler: Reject policy. The JUC package provides four methods.
AbortPolicy
: Throws an exception, default (more common)CallerRunsPolicy
: hands off to the thread from which the thread pool call is madeDiscardPolicy
: Directly discards the taskDiscardOldestPolicy
: Discards the oldest task in the queue and continues the current task to the thread pool
-
KeepAliveTime: keepAliveTime of idle threads. If allowCoreThreadTimeOut == false, keepAliveTime of idle threads is maintained for the number of core threads. AllowCoreThreadTimeOut == true The number of threads in the core that are idle will also be reclaimed.
-
AllowCoreThreadTimeOut: Controls whether threads within the number of core threads can be reclaimed. True does, false does not.
-
CorePoolSize: Limit on the number of core threads
-
MaximumPoolSize: maximum number of threads in a thread pool.
-
DefaultHandler: The default reject policy using AbortPolicy to throw an exception.
Small methods
Commonly used small method source as follows:
// Packing and unpacking ctl
// Get the current thread pool running state, that is, get the three digits of CTL +29 zeros
// Set the CTL value & ~CAPACITY CAPACITY to 0 and the lower 29 bits to 1, i.e. 000 11111111111111111111
//~000 11111111111111111111 => 111 000000000000000000000
//c == ctl = 111 000000000000000000111
/ / 111 000000000000000000111
/ / 111 000000000000000000000
/ / 111 000000000000000000000
private static int runStateOf(int c) { return c & ~CAPACITY; }
// Get the number of current thread pool threads
//c == ctl = 111 000000000000000000111
/ / 111 000000000000000000111
/ / 000 111111111111111111111
/ / 000, 000000000000000000111 = > 7
private static int workerCountOf(int c) { return c & CAPACITY; }
// used when resetting the current thread pool CTL value
// Rs indicates the state of the thread pool. Wc indicates the number of workers (threads) in the current thread pool
/ / rs: 111 000000000000000000
// |
/ / wc: 000 000000000000000111
/ / =
/ / 111 000000000000000111
private static int ctlOf(int rs, int wc) { return rs | wc; }
/* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */
// Compare whether the current thread pool CTL state is less than a certain state s
//c = 111 000000000000000111 < 000 000000000000000000 == true
// All cases, RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// Compare whether the current thread pool CTL state is greater than or equal to a certain state s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// RUNNING must be less than SHUTDOWN. SHUTDOWN == 0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// Use CAS to set the CTL value to +1, return true on success, false on failure
// Increase the number of threads +1 for the lower 29 bits of the CTL
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// Use CAS to set CTL to -1, true on success, false on failure
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// Decrease the CTL value by one
private void decrementWorkerCount(a) {
// This will continue to retry until success.
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
Copy the code
runStateOf
: Gets the current thread pool running statusctl
The third digit value of +29
a0
.workerCountOf
: Gets the number of threads in the current thread pool, i.ectl
The low29
The value of a.ctlOf
: Used to reset the current thread poolctl
The value of,rs
Represents the thread pool state,wc
Represents the current thread poolworker
Number of threads.runStateLessThan
: compares the current thread poolctl
Is the state represented less than a certain states
We knowRUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
.runStateAtLeast
: compares the current thread poolctl
Is the state represented greater than or equal to a certain states
.isRunning
: less thanSHUTDOWN
The must beRUNNING
.SHUTDOWN == 0
.compareAndIncrementWorkerCount
Use:CAS
Way to makectl
Increment the number of threads in thetrue
, return on failurefalse
compareAndDecrementWorkerCount
Use:CAS
Way to makectl
If the number of threads in thetrue
, return on failurefalse
decrementWorkerCount
Use:CAS
Way to makectl
If the number of threads is reduced by one, this method will succeed.
A constructor
Constructor source code is as follows:
public ThreadPoolExecutor(intCorePoolSize,// limit the number of core threadsintMaximumPoolSize,// Maximum thread limitlongKeepAliveTime,// TimeUnit unit,// seconds nano.. BlockingQueue<Runnable> workQueue,// task queue ThreadFactory ThreadFactory,// ThreadFactory RejectedExecutionHandler handler/* Reject policy */) {
// Check whether the parameter is out of bounds
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// Work queues and thread factories and rejection policies cannot be empty.
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Copy the code
The 7 parameters in the constructor are as follows:
corePoolSize
: Number of core threads in the thread pool.- When the number of threads in the thread pool is less than
corePoolSize
When the number of core threads is specified, a new thread will be created by the thread pool to handle the newly submitted task, even if other threads in the thread pool are idle.The underlying calladdWorker
Methods the newworker
(thread), and treat the newly submitted task as newly createdworker
The first task of the thread, i.efirstTask
). - When the number of threads in the thread pool is greater than or equal to
corePoolSize
Number of core threads and less thanmaximumPoolSize
Maximum number of threads for newly submitted tasks, if the task queueworkQueue
If not, the newly submitted task is added to the task queueworkQueue
In the. If the task queueworkQueue
When full, a new thread is created (The underlying calladdWorker
Methods the newworker
(thread), and treat the newly submitted task as newly createdworker
The first task of the thread, i.efirstTask
) to handle newly submitted tasks. - When the number of threads in the thread pool is greater than or equal to
maximumPoolSize
Maximum number of threads, if the task queueworkQueue
If not, the newly submitted task is added to the task queueworkQueue
In the. If the task queueworkQueue
When it is full, it passeshandler
The rejection policy specified to process newly submitted tasks.
- When the number of threads in the thread pool is less than
maximumPoolSize
: Maximum number of threads in the thread pool.workQueue
: Task queue (blocking queue)keepAliveTime
: When the number of threads in the thread pool is greater thancorePoolSize
Number of core threads, and other threads outside these core threads are idle (no new tasks are submitted), waitkeepAliveTime
This part of the thread will be reclaimed after time.unit
:keepAliveTime
The time unit of a parameter, for exampleTimeUnit.SECONDS
Said the second.handler
: Rejection policy.threadFactory
: thread factory, used to actually create threads, used by defaultDefaultThreadFactory
In thenewThread
Method to create threads in a thread pool, which sets the first created thread to non-daemons and priority to 5, i.eThread.NORM_PRIORITY
, the highest thread priority is 10, i.eThread.MAX_PRIORITY
.DefaultThreadFactory
The structure of the class is as follows:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix ="pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
/** * create thread */ with this method
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
/ / thread
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
// Set to non-daemon thread
t.setDaemon(false);
if(t.getPriority() ! = Thread.NORM_PRIORITY)// Set the thread priority to 5 and the highest priority to 10
t.setPriority(Thread.NORM_PRIORITY);
returnt; }}Copy the code
DefaultThreadFactory
Of the classnewThread
The method will be in the inner class belowWorker
Called in the constructor of. The inner classWorker
The constructor will be calledaddWorker
Method to createworker
(thread).
The inner class
The more important internal class Worker is used to encapsulate threads when they are created. Its structure and important methods are as follows:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//Worker uses exclusive mode of AQS
// Exclusive mode: Two important attributes state and ExclusiveOwnerThread
//state: 0 indicates that the lock is not occupied > 0 indicates that the lock is occupied < 0 indicates the initial state. In this case, locks cannot be snatched.
//ExclusiveOwnerThread: exclusive lock thread.
/** Thread this worker is running in. Null if factory fails. */
// The worker thread encapsulated within the worker
final Thread thread;
/** Initial task to run. Possibly null. */
// if firstTask is not empty, then when the worker starts (the internal thread starts) firstTask will be executed first, and when the firstTask is finished, the queue will fetch the next task.
Runnable firstTask;
/** Per-thread task counter */
// Record the number of tasks completed by the current worker.
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
//firstTask can be null. Null will be retrieved from the queue after startup.
Worker(Runnable firstTask) {
// Set the exclusive mode of AQS to the initialization state, which cannot be preempted lock.
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// A thread factory is used to create a thread with the current worker specified as Runnable, that is, the thread starts with worker.run() as the entry point.
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
// When the worker starts, run() is executed
public void run(a) {
ThreadPoolExecutor->runWorker() ¶
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// Check whether the current worker's exclusive lock is exclusive.
//0 indicates that it is not occupied
//1 indicates that it is occupied
protected boolean isHeldExclusively(a) {
returngetState() ! =0;
}
// Try to occupy worker's exclusive lock
// The return value indicates whether the preemption succeeded
protected boolean tryAcquire(int unused) {
// Use CAS to change the state in AQS. The expected value is 0(0 indicates that the current thread is not occupied)
// Set ExclusiveOwnerThread to the current thread.
if (compareAndSetState(0.1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
Unlock -> aqs.release () ->tryRelease()
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
If the lock fails, the current thread will be blocked until the lock position is obtained.
public void lock(a) { acquire(1); }
If the current lock is not held, it will return true, otherwise it will not block the current thread, and will return false.
public boolean tryLock(a) { return tryAcquire(1); }
// Normally, we call unlock to ensure that the current thread holds the lock.
// In special case, when worker state == -1, call unlock to initialize state and set state == 0
Unlock () is called before starting the worker. ExclusiveOwnerThread == NULL State==0 is forcibly refreshed
public void unlock(a) { release(1); }
// Returns whether the current worker's lock is occupied.
public boolean isLocked(a) { return isHeldExclusively(); }
void interruptIfStarted(a) {
Thread t;
if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Copy the code
-
The attributes are as follows:
thread
:worker
Internally encapsulated worker threads.firstTask
: createWorker
Argument when the constructor is calledfirstTask
If it is not empty, then whenworker
Execution takes precedence after startup (internal thread startup)firstTask
When the execution is completefirstTask
After, the call will be madegetTask
Method to the task queueworkQueue
To get the next task to execute; iffirstTask
If the value is empty, it is createdworker
Direct callgetTask
Method to the task queueworkQueue
To get the task to execute.completedTasks
: Record currentworker
Number of tasks completed.
-
Constructor first call setState (1) set up the state field of AQS value of 1 (because the Worker class inherits the AbstractQueuedSynchronizer), Then call getThreadFactory().newThread(this) to actually create the thread. (If ThreadFactory is specified in the constructor when creating the thread pool, the newThread method of the corresponding ThreadFactory is called.) If not specified, the DefaultThreadFactory DefaultThreadFactory is used. Note that newThread(this) passes this as the worker instance, since the worker also implements Runnable. So when the thread in the real worker calls the start method to start, the run method in the worker will be executed.
-
run
Method for whenworker
Is called when the thread instart
Methodrun
Method, andrun
Method by calling the core methodrunWorker
.
The execute method
The execute method is called when we submit a task in the thread pool. The source code for this method is as follows:
// Command can be a normal Runnable implementation class or FutureTask
public void execute(Runnable command) {
// Non-null judgment..
if (command == null)
throw new NullPointerException();
// get the latest CTL value and assign it to C, CTL: the high 3 bits represent the state of the thread pool, and the low level represents the number of threads in the current thread pool.
int c = ctl.get();
//workerCountOf(c) gets the number of current threads
// The condition is true: the number of current threads is smaller than the number of core threads. This time, a new worker is directly created, and a new thread is added to the thread pool.
if (workerCountOf(c) < corePoolSize) {
//addWorker is a thread creation process that creates worker objects and uses Command as firstTask
//core == true indicates that the number of core threads is limited. False indicates that maximumPoolSize is used
if (addWorker(command, true))
// After the creation is successful, return directly. The addWorker method will start the newly created worker and execute firstTask.
return;
// addWorker must have failed...
// What are the possibilities?
WorkerCountOf (c) < corePoolSize;
// Other threads may also be set up and create workers to the thread pool. At this point the number of core threads in the thread pool has been reached, so...
//2. The current thread pool state has changed. RUNNING SHUTDOWN STOP TIDYINGTERMINATION
// If the thread pool is not RUNNING, addWorker(firstTask! = null, true | false) is bound to fail.
// The system can be successfully created in the SHUTDOWN state. If firstTask == null and the current queue is not empty. Special circumstances.
c = ctl.get();
}
// How many times does this happen?
//1. The current number of threads has reached corePoolSize
/ / 2. AddWorker failure..
If the thread pool is in the running state, the task is added to the workQueue.
if (isRunning(c) && workQueue.offer(command)) {
// The task was submitted successfully.
// Get the CTL again and save it to recheck.
int recheck = ctl.get();
// Condition 1:! If isRunning(recheck) is enabled, the state of the thread pool has been changed by an external thread after you commit to the queue, for example: shutdown() shutdownNow()
// Delete the task you just submitted.
// Condition 2: remove(command) Either succeeds or fails
// Success: threads in the thread pool have not been consumed (processed) after the commit
// Failed: the commit is processed by the thread in the thread pool before shutdown() shutdownNow().
if (! isRunning(recheck) && remove(command))
// The thread pool status is not running after the commit and the task has been queued successfully.
reject(command);
// How many cases can I get here?
//1. The current thread pool is running.
//2. The thread pool status is not running, but the task submitted by remove fails.
// The thread pool is running, but the number of threads alive in the pool is 0. If the thread pool is 0, the task will not run.
// This is a guarantee mechanism to ensure that at least one thread is working in the running state.
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// How many cases are there?
//1. The offer method fails
//2. The thread pool is not in the running state
//1. Failed offer, what do I need to do? The queue is full. If the current number of threads has not reached maximumPoolSize, a new worker will be created to execute command directly
// If the current number of threads reaches maximumPoolSize, this will also fail.
//2. The thread pool state is not running. = null addWorker must return false.
else if(! addWorker(command,false))
reject(command);
}
Copy the code
The process for executing the execute method is as follows:
-
When the number of threads in the thread pool is less than the number of corePoolSize core threads, addWorker(command, true) is called to create a new worker(thread) to execute the newly submitted command. Core == true indicates that the number of core threads is limited. False indicates that maximumPoolSize is used.
-
When the number of threads in the thread pool is greater than or equal to corePoolSize, and the task queue is not full, workqueue.offer (command) is called to add the task command to the task queue.
- After the task is added to the queue, the state of the current thread pool is determined, if not
RUNNING
(…! isRunning(recheck)
The thread pool state is changed by an external thread after you commit to the queue.shutdown() ,shutdownNow()
) needs to be calledremove(command)
Remove the task from the task queue,remove(command)
May or may not be successful, success: threads in the thread pool have not been consumed (processed) after the commit; Failed: after submission, inshutdown()
shutdownNow()
Before, it was processed by threads in the thread pool. - If the call
remove(command)
If the task has been successfully removed from the task queue, it is then calledreject(command)
Go with the “no” strategy. - If the current thread pool state is
RUNNING
If state or the previous steps fail to remove the task from the task queue, then check whether the number of threads in the current thread pool is equal to0
, if equal to0
, you need to calladdWorker(null, false)
Create a new oneworker
, this is actually a guarantee mechanism to ensure that the thread pool inRUNNING
State, at least one thread must be working.
- After the task is added to the queue, the state of the current thread pool is determined, if not
-
If the number of threads in the thread pool is greater than or equal to corePoolSize and less than the maximum number of threads maximumPoolSize, and the blocking queue in the thread pool is full, addWorker(Command, false) is called to create and start a thread to execute the newly submitted task.
-
If the previous step found that the number of threads in the thread pool is greater than or equal to maximumPoolSize and the blocking queue in the thread pool is full, AddWorker (command, false) returns false and then calls reject(command) to reject the policy.
The execution flow of the execute method is as follows:
AddWorker method
The main function of addWorker is to create a worker (thread) to handle the newly submitted task firstTask. FirstTask specifies the firstTask to be executed by the first created worker (thread). If firstTask is empty, Then the newly created worker (thread will get the task from the task queue to execute it. If the core parameter is true, the current active thread count is smaller than corePoolSize when adding a worker. If the core parameter is false, the current active thread count is smaller than maximumPoolSize before adding a worker.
//firstTask can be null, which means that after starting the worker, the worker automatically obtains the task from the queue.. If not null, the worker executes firstTask first
If true, use the core thread limit. False Use the maximumPoolSize thread limit.
// Return value summary:
//true: the worker is created successfully and the thread is started
//false indicates that the creation fails.
Thread pool status rs > SHUTDOWN (STOP/TIDYING/TERMINATION)
Rs == SHUTDOWN but there is no task in the queue or the current state is SHUTDOWN and the queue is not empty but firstTask is not null
//3. The current thread pool has reached the specified threshold (coprePoolSize or maximumPoolSIze).
//4. ThreadFactory creates a null thread
private boolean addWorker(Runnable firstTask, boolean core) {
// Spin determines whether the current thread pool state allows thread creation.
retry:
for (;;) {
// Get the current CTL value and save it to c
int c = ctl.get();
// Get the current thread pool running state and save it to rs
int rs = runStateOf(c);
// Check if queue empty only if necessary.
Rs >= SHUTDOWN: the current thread pool is not in the running state
// The current thread pool state is not running! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
//rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
// Indicates that the current thread pool state is SHUTDOWN and the submitted task is empty. AddWorker may not be called by execute. & The current task queue is not empty
If the thread pool is SHUTDOWN and there are tasks in the queue, workers can be added, but tasks cannot be submitted again.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))// When does this return false?
// Thread pool state rs > SHUTDOWN
//rs == SHUTDOWN but there is no task in the queue or rs == SHUTDOWN and firstTask! = null
return false;
// Check whether the current state of the thread pool allows adding threads.
// Internal spin gets the process of creating a thread token.
for (;;) {
// Get the number of threads in the current thread pool and save it in wc
int wc = workerCountOf(c);
// Wc >= CAPACITY can never be established because CAPACITY is a large number of 500 million
Wc >= (core? corePoolSize : maximumPoolSize)
//core == true, determine whether the current number of threads >=corePoolSize, will limit the number of core threads.
//core == false, determine whether the current number of threads >=maximumPoolSize, will limit the maximum number of threads.
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// The specified limit has been reached
return false;
// The number of logging threads has been increased by 1, which is equivalent to applying for a token.
// Conditional failure: another thread may have changed the value of CTL.
// What could have happened?
//1. The other thread applied for the token before this. Cause CAS to fail (i.e. the c you got is old)
//2. An external thread may have called shutdown() or shutdownNow(), resulting in a change in the state of the thread pool
// The CAS fails when the state changes.
if (compareAndIncrementWorkerCount(c))
// Cas must be successful if you enter the cas. We got the token
// Directly jumps out of the for spin outside the retry.
break retry;
//CAS failed to apply for a token
// Get the latest CTL value
c = ctl.get(); // Re-read ctl
// Determine whether the current thread pool state has changed, if the external call shutdown.shutdownNow before this state will change.
if(runStateOf(c) ! = rs)// Return to the outer loop. The outer loop is responsible for determining the current state of the thread pool and whether it is allowed to create threads.
continue retry;
// else CAS failed due to workerCount change; retry inner loop}}// Indicates whether the created worker is started. False: Not started. True: Started
boolean workerStarted = false;
// Indicates whether the created worker is added to the pool. Default false: Not added. True: Added.
boolean workerAdded = false;
//w is a reference to create the worker later.
Worker w = null;
try {
// Create the Worker. After executing, the thread should be created.
w = new Worker(firstTask);
// Assign the thread of the newly created worker node to t
final Thread t = w.thread;
// Why t! = null?
ThreadFactory is an interface that can be implemented by anyone.
// Create thread null,;
//Doug Lea thinks things through. Will definitely prevent his own program from reporting null Pointers, so do it here!
if(t ! =null) {
// Save a reference to the global lock to mainLock
final ReentrantLock mainLock = this.mainLock;
// Hold the global lock, which may block until it is acquired successfully. All operations that manipulate the thread pool must hold the lock at the same time.
mainLock.lock();
// No other thread can modify the current thread pool state.
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// Get the latest thread pool running state and save it to rs
int rs = runStateOf(ctl.get());
If rs < SHUTDOWN is set, the current thread pool is RUNNING.
// Condition 2: The current thread pool state is not RUNNING.
//(rs == SHUTDOWN && firstTask == null) The current state is SHUTDOWN and firstTask is empty. What we're really looking at is a special case of SHUTDOWN,
// The queue is empty
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// t.isalive () returns true when the thread is started.
ThreadFactory creates a thread and starts it before returning it to the outside world.
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// Add the worker we created to the thread pool.
workers.add(w);
// Get the latest number of current thread pool threads
int s = workers.size();
// The current number of threads is a new high. Update largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
// indicates that the thread has been appended to the thread pool.
workerAdded = true; }}finally {
// Release the thread pool global lock.
mainLock.unlock();
}
// If the condition is true, the worker is added successfully
// Conditional failure: The state of the thread pool changed before the lock, resulting in the failure to add the thread pool.
if (workerAdded) {
// After success, the created worker will start and the thread will start.
t.start();
// The startup flag is set to true
workerStarted = true; }}}finally {
// Conditional:! WorkerStarted Indicates that startup fails and clearing work needs to be done.
if (! workerStarted)
// What cleanup should I do when I fail?
//1. Release the token
//2. Clear the current worker from the workers collection
addWorkerFailed(w);
}
// Returns whether the newly created thread is started.
return workerStarted;
}
Copy the code
The addWorker method is executed as follows:
- First, determine whether to create a thread pool based on its state
worker
(thread), when the thread pool state is greater thanSHUTDOWN
That is, the thread pool is inSTOP,TIDYING,TERMINATED
(The thread pool in these states interrupts the thread that is processing the task and does not receive new tasks or process tasks in the task queue, thus exiting the thread pool in turn) or when the thread pool is inSHUTDOWN
, but the task queue is empty (Because the thread pool is inSHUTDOWN
In state, it simply does not receive new submitted tasks, but continues to process the tasks in the task queue. If the task queue is empty, there is no need to add tasksworker
Thread the) or when the thread pool is inSHUTDOWN
andfirstTask
Not empty, the code to judge is as follows:
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))// When does this return false?
// Thread pool state rs > SHUTDOWN
//rs == SHUTDOWN but there is no task in the queue or rs == SHUTDOWN and firstTask! = null
return false;
Copy the code
- Then, it will not be created if the following conditions are met
woker
① The number of threads in the current thread pool is greater thanCAPACITY
(CAPACITY
Is equal to more than 500 million, usually this can’t be greater than); ② According to the incomingcore
Parameter (based on the number of core threads) to determine whether the value is greater than the number of core threadscorePoolSize
Or maximum number of threadsmaximumPoolSize
, the code is as follows:
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// The specified limit has been reached
return false;
Copy the code
- And then it goes through
cas
Change the value of the number of threads in the thread pool by one, i.ectl
The low29
Bit plus one, and then it passesw = new Worker(firstTask)
The bottom passes through the topThreadFactory
thenewThread
Method) createworker
Thread. - Finally get the global lock that will be newly created
worker
Added to theworkers
Release all locks to the newly createdworker
A thread call encapsulated instart
Method to start.After the thread is started, it will eventually executeworker
In the instancerun
Methods.
RunWorker method
As we know from the above, when the thread in worker starts, the run method in the worker instance will be executed, and the run method really calls the runWorker method. The source code of this method is as follows:
//w starts the worker
final void runWorker(Worker w) {
//wt == w.thread
Thread wt = Thread.currentThread();
// Assign the initial execution task to the task
Runnable task = w.firstTask;
// Clear the current w.firstTask reference
w.firstTask = null;
// Why call unlock first? Initialize worker state == 0 (start with -1) and exclusiveOwnerThread ==null
w.unlock(); // allow interrupts
// If the thread exits suddenly, true-> An exception occurred. The current thread exits suddenly
//false-> Exit normally.
boolean completedAbruptly = true;
try {
// task! If firstTask is not null, execute it directly inside the loop body.
Task = getTask())! GetTask = null: the current thread succeeded in obtaining the task from the queue. GetTask is a method that blocks the thread
GetTask If null is returned, the current thread needs to execute the end logic.
while(task ! =null|| (task = getTask()) ! =null) {
//worker sets the exclusive lock to the current thread
// Why do you set an exclusive lock? When shutdown, the current worker status is judged, and whether the current worker is working is determined according to whether the exclusive lock is idle.
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// runStateAtLeast(ctl.get(), STOP) indicates that the thread pool is in STOP/TIDYING/TERMINATION state
RunStateAtLeast (ctl.get(), STOP)&&! wt.isInterrupted()
If the thread pool status is >=STOP and the current thread status is not set to interrupt, then you need to enter the if, to interrupt the current thread.
// Assume: runStateAtLeast(ctl.get(), STOP) == false
// (thread.interrupted () &&runstateatleast (CTL. Get (), STOP))
// Thread.interrupted() gets the current interruption status and sets the interrupt bit to false. Called twice in a row, this method must return false the second time.
// runStateAtLeast(ctl.get(), STOP)
// It forces the current thread to refresh its interrupt bit false because it is possible that the last time the task was executed, the current thread's interrupt bit was set to true in the business code and no processing was done
// You must force a refresh here. This does not affect subsequent tasks.
Thread.interrupted() == true and runStateAtLeast(ctl.get(), STOP)) == true
// Is there any chance of this happening?
// It is possible because the external thread has the opportunity to call shutdown and shutdownNow after the first (runStateAtLeast(ctl.get(), STOP) == false to change the thread pool state
// At this point, the interrupt flag bit of the current thread is set back to the interrupt state.
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
// The hook method is left to subclasses to implement
beforeExecute(wt, task);
Thrown an exception. If thrown between time zones is not null, an exception has been thrown to the upper layer.
Throwable thrown = null;
try {
// Task can be either FutureTask or a generic Runnable interface implementation class.
// If the previous runnable/ Callable was submitted via submit(), it will be encapsulated as FutureTask. This is not clear, look at the previous issue, station B.
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// The hook method is left to subclasses to implementafterExecute(task, thrown); }}finally {
// Set the local variable task to Null
task = null;
// Update the number of tasks completed by the worker
w.completedTasks++;
// When the worker finishes processing a task, it releases the exclusive lock
//1. Normally, getTask() gets the task again while(getTask...)
//2. Task.run () an internal exception was thrown..w.unlock(); }}// Under what circumstances will you come here?
The getTask() method returns null, indicating that the current thread should perform exit logic.
completedAbruptly = false;
} finally {
When an exception is thrown inside task.run(), jump directly from w.lock () to this line.
// Exit completedAbruptly == false
// Abnormal exit completedAbruptly == trueprocessWorkerExit(w, completedAbruptly); }}Copy the code
The execution flow of this method is as follows:
-
First, w.nlock () is used to change the value of state in AQS to 0 (setState(-1) method in worker constructor is used to initialize the value of state to -1 when creating worker).
-
Then enter the while loop. If firstTask is not empty, the current worker will execute the task in firstTask first. If firstTask is empty, the current worker will get the task from the workQueue and execute it until there are no tasks in the task queue.
-
If the state of the current thread pool changes to STOP, an interrupt message, wt.interrupt(), is given to the current thread. Task.run () is used to execute the task. After executing the task, the number of completedTasks in the current worker completedTasks will be added by one.
-
Finally, if an exception occurs while executing the task, processWorkerExit(w, completedAbruptly) will be executed, which will be examined next.
GetTask method
The getTask method is used by threads to get tasks from the workQueue. The getTask method is used by threads to get tasks from the workQueue.
// When is null returned?
//1. Rs >= STOP
//2. The precondition state is SHUTDOWN, workqueue.isempty ().
//3. When the number of threads in the thread pool exceeds the maximum limit, some threads return Null
//4. If the number of threads in the thread pool exceeds corePoolSize, null will be returned after some threads timeout.
private Runnable getTask(a) {
// Indicates whether the current thread has timed out. Default Value False True indicates that the task has timed out
boolean timedOut = false; // Did the last poll() time out?
/ / spin
for (;;) {
// Get the latest CTL value and save it to c.
int c = ctl.get();
// Get the current running status of the thread pool
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// Condition 1: rs >= SHUTDOWN: The current thread pool is not in the RUNNING state, which may be SHUTDOWN/STOP....
/ / condition 2: (rs > = STOP | | workQueue. IsEmpty ())
//2.1:rs >= STOP; //2.1:rs >= STOP
Workqueue.isempty () : the current thread pool is SHUTDOWN and the task queue isEmpty. Null must be returned.
// Return null, the runWorker method will execute the thread that returns null to exit the thread pool logic.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// use CAS+ infinite loop to set CTL to -1
decrementWorkerCount();
return null;
}
// How many cases are there?
//1. The thread pool is RUNNING
//2. The thread pool is SHUTDOWN but the queue is not empty.
// Get the number of threads in the thread pool
int wc = workerCountOf(c);
// Are workers subject to culling?
//timed == true Indicates that the current thread supports the timeout mechanism when obtaining a task. Poll (XXX, XXX) is used. When fetching task times out, the next spin may return NULL.
//timed == false indicates that the current thread does not support the timeout mechanism when obtaining a task and uses queue.take().
//queue.poll and queue.take() both block the current thread
AllowCoreThreadTimeOut == true indicates that threads within the number of core threads can also be reclaimed.
Poll (XXX, XXX) timeout mechanism is used by all threads to fetch tasks.
AllowCoreThreadTimeOut == false Indicates that the current thread pool maintains a core number of threads.
//wc > corePoolSize
// The number of threads in the thread pool is larger than the number of core threads.
// It is possible that some threads fail to get the task, return Null, and then.. The runWorker performs thread exit logic.
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
A: (wc / / conditions > maximumPoolSize | | (timed && timedOut))
//1.1: wc > maximumPoolSize The setMaximumPoolSize() method may set the maximum number of threads in the thread pool to a smaller number than when initialized by an external thread
//1.2: The (timed && timedOut) condition is valid: the prerequisite condition is that the current thread obtains the task in poll mode. The last time the poll method was used to get the task in the loop, it timed out
// A condition of true indicates that threads can be recycled, meet the recycling criteria, and recycle when they really need to be recycled.
/ / condition 2: (wc > 1 | | workQueue. IsEmpty ())
If wc > 1 is true, there are other threads in the current thread pool. The current thread can be directly reclaimed and return NULL
//2.2: workqueue.isempty () if wc == 1, this condition is true: the current task queue isEmpty, and the last thread can be relieved to exit.
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// Use the CAS mechanism to subtract the CTL value -1 from the successful thread and return null
// If CAS is successful, Null is returned
/ / CAS failure? Why did CAS fail?
//1. Other programs exit before you
//2. The thread pool state has changed.
if (compareAndDecrementWorkerCount(c))
return null;
// Timed is likely to be false when the current thread fails the CAS and another thread exits successfully
// Check that the current thread is not in the collection scope.
continue;
}
try {
// Get the task logic
// Get the task from the task queue
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// Condition true: return to task
if(r ! =null)
return r;
// The current thread timed out...
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
The main execution process of this method is as follows:
-
If the current thread pool is SHUTDOWN and the workQueue is empty, or the thread pool is stopped or greater, the number of threads in the pool will be increased by one and null will be returned. The recycling thread logic is then handled in the runWorker through the processWorkerExit method.
-
It then passes the timeed flag (null is returned if the task from the workQueue exceeds the keepAliveTime value specified when the thread pool was created, and the thread logic is then recycled through the processWorkerExit method in the runWorker), Otherwise, workqueue.take () is used to get the return of the task in the task queue.
ProcessWorkerExit method
The processWorkerExit method is used to process worker logic in the recycle thread pool.
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// The condition is true: it means that the current worker w has an abnormal exit, and an exception was thrown up during the execution of the task.
// When the exception exits, the CTL counts, and there is no -1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// Get the global lock reference for the thread pool
final ReentrantLock mainLock = this.mainLock;
/ / lock
mainLock.lock();
try {
// Aggregate the number of tasks completed by the current worker to the thread pool's completedTaskCount
completedTaskCount += w.completedTasks;
// Remove worker from pool..
workers.remove(w);
} finally {
// Release the global lock
mainLock.unlock();
}
// Decide whether to terminate the thread pool based on the thread pool state
tryTerminate();
// Get the latest CTL value
int c = ctl.get();
// The thread pool status is RUNNING or SHUTDOWN
if (runStateLessThan(c, STOP)) {
// The current thread is exiting normally..
if(! completedAbruptly) {//min Indicates the minimum number of threads in the thread pool
//allowCoreThreadTimeOut == true => Indicates that threads within the number of core threads will also be recycled due to timeout. min == 0
//allowCoreThreadTimeOut == false => min == corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// Thread pool status: RUNNING SHUTDOWN
// if min == 0
// Condition 2:! Workqueue.isempty () indicates that there are still tasks in the task queue and at least one thread should be left.
if (min == 0&&! workQueue.isEmpty()) min =1;
// The condition is true: there are enough threads in the thread pool.
WorkerCountOf (c) >= min => (0 >= 0)?
// It's possible!
// In what case? This occurs when the number of core threads in the thread pool can be reclaimed, in which case the number of threads in the current thread pool becomes zero
// The next time the task is submitted, another thread will be created.
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//1. A new worker must be created on top of the current thread when an exception occurs during task execution.
/ / 2.! Workqueue.isempty () indicates that there are still tasks in the task queue and at least one thread should be left. The current state of RUNNING | | SHUTDOWN
//3. If the number of threads is < corePoolSize, the number of threads will be created.
addWorker(null.false); }}Copy the code
The main execution process of this method is as follows:
-
Firstly, the global lock is obtained, and the number of completed tasks in the worker to be removed is added to the number of completed tasks completedTaskCount in the thread pool, and then the worker is removed from workers.
-
The tryTerminate method is then called to determine whether to terminate the thread pool based on its state. Let’s talk about this method.
-
Finally, a guarantee mechanism will be added. If the current thread pool is smaller than STOP, that is, when the thread pool is RUNNING or SHUTWODN, there are still tasks in the task queue, at this time, it is necessary to ensure at least one thread in the thread pool, that is, execute addWorker(null, false) to add the worker.
TryTerminate method
The tryTerminate method is used to determine whether to terminate the thread pool.
final void tryTerminate(a) {
/ / spin
for (;;) {
// Get the latest CTL value
int c = ctl.get();
// if isRunning(c) is enabled, the thread pool is normal.
// runStateAtLeast(c, TIDYING) indicates that another thread is TERMINATED at TIDYING -> TERMINATED.
RunStateOf (c) == SHUTDOWN &&! workQueue.isEmpty())
//SHUTDOWN special case, if this case, directly back. Wait until the tasks in the queue are finished before switching to the status.
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// What happens when this is executed?
Thread pool status >= STOP
//2. The thread pool is SHUTDOWN and the queue is empty
// The number of threads in the current thread pool is > 0
if(workerCountOf(c) ! =0) { // Eligible to terminate
// Interrupt an idle thread.
// Free thread, where is free? queue.take() | queue.poll()
//1. The awakened thread returns null in getTask()
//2. When the exit logic is executed, tryTerminate() is called again to wake up the next idle thread
/ / 3. The state is because the thread pool (thread pool status > = STOP | | thread pool status to SHUTDOWN and the queue is empty) eventually call addWorker, will fail.
// This is where idle threads will eventually exit. Non-idle threads will also call tryTerminate when they have finished executing their current task, and may end up here.
interruptIdleWorkers(ONLY_ONE);
return;
}
// Who is the thread that executes here?
//workerCountOf(c) == 0.
// The last thread to exit. We know, in the (state of the thread pool > = STOP | | thread pool status to SHUTDOWN and the queue is empty)
// After the thread wakes up, the exit logic will be executed. During the exit process, the workerCount will be counted -1 => ctl-1.
// Before the tryTerminate method is called, it is already subtracted, so 0 means that this is the last thread to exit.
final ReentrantLock mainLock = this.mainLock;
// Get the thread pool global lock
mainLock.lock();
try {
// Set the thread pool state to TIDYING.
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// Call the hook method
terminated();
} finally {
// Set the thread pool state to TERMINATED.
ctl.set(ctlOf(TERMINATED, 0));
// Wake up the thread calling awaitTermination().
termination.signalAll();
}
return; }}finally {
// Release the thread pool global lock.
mainLock.unlock();
}
// else retry on failed CAS}}Copy the code
The main execution process of this method is as follows:
-
TERMINATED thread pool is RUNNING, greater than or equal to TIDYING, or is SHUTDOWN and the workQuue task queue is TERMINATED. It just returns without performing any logic.
-
If this condition is not met, the current thread pool is terminated. If the number of threads in the pool is not equal to zero, interruptIdleWorkers(ONLY_ONE) is called to signal an interrupt to all threads.
-
Ctl.com pareAndSet(c, ctlOf(TIDYING, 0)) is executed to change the state of the thread pool to TIDYING and the number of threads terminated. The hook method is terminated. Finally change the state of the thread pool to TERMINATED.
ShutDown method
When the thread pool is in the RUNNING state, the shutDown method changes the state of the thread pool to SHUTWODN.
public void shutdown(a) {
final ReentrantLock mainLock = this.mainLock;
// Get the thread pool global lock
mainLock.lock();
try {
// Do not care about this line
checkShutdownAccess();
// Set the thread pool state to SHUTDOWN
advanceRunState(SHUTDOWN);
// Interrupt the idle thread
interruptIdleWorkers();
// Empty method, subclasses can be extended
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// Release the thread pool global lock
mainLock.unlock();
}
// Go back to...
tryTerminate();
}
/** * Set the thread pool state to SHUTDOWN */
private void advanceRunState(int targetState) {
/ / spin
for (;;) {
int c = ctl.get();
If targetState == SHUTDOWN, the current thread pool state is >= SHUTDOWN
If targetState == SHUTDOWN, the current thread pool state is RUNNING.
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break; }}private void interruptIdleWorkers(a) {
interruptIdleWorkers(false);
}
//onlyOne == true interrupts onlyOne thread; false interrupts all threads
// Common premise: worker is idle.
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
// Hold the global lock
mainLock.lock();
try {
// Iterate over all workers
for (Worker w : workers) {
// Get the current worker's thread and save it to t
Thread t = w.thread;
// Condition 1: True:! T.isinterrupted () == true means that the thread in the current iteration has not been interrupted.
If the current worker is idle, it can send an interrupt signal to the worker. The worker threads in the queue. Take () | queue. The poll ()
// blocking. Because when the worker executes the task, it is locked!
if(! t.isInterrupted() && w.tryLock()) {try {
// Give the current thread an interrupt signal.. A thread that is blocked in its queue will wake up, and when it wakes up, it may return NULL on its next spin. Perform exit logic.
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// Release worker's exclusive lock.w.unlock(); }}if (onlyOne)
break; }}finally {
// Release the global lock.mainLock.unlock(); }}Copy the code
Just read the notes above without going into details.
ShutDownNow method
Call shutDownNow to STOP the thread pool.
public List<Runnable> shutdownNow(a) {
// Return value reference
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// Get the thread pool global lock
mainLock.lock();
try {
checkShutdownAccess();
// Set the thread pool state to STOP
advanceRunState(STOP);
// Interrupts all threads in the thread pool
interruptWorkers();
// Export unprocessed tasks
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
// Returns the unprocessed tasks in the current task queue.
return tasks;
}
/** * Change the thread pool state to STOP */
private void advanceRunState(int targetState) {
/ / spin
for (;;) {
int c = ctl.get();
If targetState == SHUTDOWN, the current thread pool state is >= SHUTDOWN
If targetState == SHUTDOWN, the current thread pool state is RUNNING.
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break; }}/** * gives an interrupt signal to all threads in the thread pool, whether they are working on a task or idle */
private void interruptWorkers(a) {
final ReentrantLock mainLock = this.mainLock;
// Get the thread pool global lock
mainLock.lock();
try {
// Iterate over all workers
for (Worker w : workers)
//interruptIfStarted() Gives an interrupt signal if the thread in the worker is started.
w.interruptIfStarted();
} finally {
// Release the thread pool global lockmainLock.unlock(); }}void interruptIfStarted(a) {
Thread t;
if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
Copy the code
Just read the notes above without going into details.