“This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”
A thread pool that cannot be wound
Just looking at the English semantics of ThreadPoolExecutor, you can see that this is a class related to thread pools.
Anyone who has ever done development knows about thread pools and can tell you more or less about them. In spite of this, the author would like to take the trouble to give you a deeper memory 😀😀
Thread pooling is a pooling technique. There are many similar pooling techniques in Java. The common ones are:
- Database connection Pool
- Redis connection pool
- The HTTP connection pool
- Memory pool
- The thread pool
The role of pooling technology: some reusable things (such as connections, threads) into an initialized pool, facilitate unified resource management. Benefits:
- Avoids the overhead of repeated creation, destruction, and scheduling, and improves performance
- Ensure full utilization of the kernel and prevent excessive scheduling
- Customize parameters to achieve the best effect
ThreadPoolExecutor knowledge
Method for creating a thread pool in Java
Is not recommended
Create the following thread pool using the static method of the Executors class
- FixedThreadPool
- ScheduledThreadPool (Perform periodic tasks)
- WorkStealingPool (generate the number of threads based on the number of cpus in the current computer)
- CachedThreadPool (with caching)
- SingleThreadPool (single thread)
recommended
Create a thread pool using ThreadPoolExecutor
// Give the thread a name with business meaning
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%s").build();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5.// Number of core threads in the thread pool
10.// The maximum number of threads in the thread pool. No more threads will be added to the thread pool after the maximum number is reached
1000.// Maximum lifetime of idle threads in a thread pool that exceeds the number of corePoolSize
TimeUnit.MILLISECONDS, // The unit of time is milliseconds
new LinkedBlockingQueue<>(50), // The worker thread waits for the queue
threadFactory, // Custom thread factory
new ThreadPoolExecutor.AbortPolicy()); // Reject policy when thread pool is full
Copy the code
why
Let’s take a look at alibaba’s “Java Development Manual” in what?
Let’s see how to write the source code?
The SingleThreadPool implementation is also a thread pool created using the ThreadPoolExecutor constructor. The reason it is not recommended is that it uses a LinkedBlockingQueue as a wait queue for worker threads, which is an unbounded buffer wait queue. The default constructor for this queue defines a length of integer.max_value
FixedThreadPool similarly
CachedThreadPool uses SynchronousQueue, an unbounded unbuffered wait queue, and its maximum number of threads is integer.max_value
ScheduledThreadPool is an unbounded blocking queue that uses DelayedWorkQueue. The maximum number of threads is integer.max_value
In contrast, with ThreadPoolExecutor, we can specify bounded/unbounded blocking queues and specify an initial length.
ThreadPoolExecutor source analysis
Thread pool life cycle
Graph LR RUNNING -- "shutdown()" --> shutdown -- "block queue empty, ShutdownNow () --> STOP -- "terminated TIDYING TIDYING -- "terminated() --> terminated TIDYING TIDYING -- "terminated()" --> TERMINATED
Thread pool status | State definition |
---|---|
RUNNING | The initial state of the thread pool after it has been created. It can accept new submitted tasks and also process tasks in the blocking queue |
SHUTDOWN | Closed state, no longer accepting new submitted tasks, but can continue processing tasks that have entered the blocking queue |
STOP | Interrupts a thread that is processing a task, can no longer accept new tasks, and does not continue processing tasks in the queue |
TIDYING | All tasks have been terminated and the workerCount(valid number of worker threads) is 0 |
TERMINATED | The thread pool terminates completely |
Tips: Never confuse the thread pool state with the thread state. Make up a thread state diagram on the web
Tips: When a thread calls start(), it may not execute immediately in the JVM. It may wait until the operating system allocates resources, when it is in the READY state, and when it gets resources, when it enters the RUNNING state, it will actually execute.
Rejection policies
- CallerRunsPolicy (executed in the current thread)
- AbortPolicy (direct selling RejectedExecutionException)
- DiscardPolicy (discards threads directly)
- DiscardOldestPolicy (discard a thread that has not been processed for the longest time and retry)
AbortPolicy is used by default when no specified rejection policy is displayed
ThreadPoolExecutor class diagram
IDEA Diagrams show UML class Diagrams, and the inheritance relationship is clear
The ThreadPoolExecutor class has many methods. The core of the ThreadPoolExecutor class is a method to construct a thread pool and a method to execute a thread task.
Task execution mechanism
- By performing
execute
methods
This method, which returns no value, is a method of ThreadPoolExecutor and is passed an object of type Runnable
- By performing
submit
methods
This method returns a Future object. It is an AbstractExecutorService method inherited from ThreadPoolExecutor. Its internal implementation also calls the execute method of the interface Executor class. The implementation of this method is still the Execute method of ThreadPoolExecutor
Execute () Execute the flowchart
Execute () source code interpretation
// Use the CTL variable of AtomicInteger. The first three bits record the status of the thread pool, and the last 29 bits record the number of threads
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// The range of Integer is [-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29, which is used to assist the left shift operation
private static final int COUNT_BITS = Integer.SIZE - 3;
Three bits are used to store the thread pool's running state, and the remaining bits represent the pool's capacity
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// The thread pool state is stored as a constant value in three bits
private static final int RUNNING = -1 << COUNT_BITS; // The thread pool accepts new tasks and processes tasks in the blocking queue
private static final int SHUTDOWN = 0 << COUNT_BITS; // The thread pool does not accept new tasks, but processes tasks in the blocking queue
private static final int STOP = 1 << COUNT_BITS; // The thread pool does not accept new tasks, does not process tasks in the blocking queue, and interrupts ongoing tasks
private static final int TIDYING = 2 << COUNT_BITS; // If all tasks are completed and the number of worker threads is zero, the terminated method will be called
private static final int TERMINATED = 3 << COUNT_BITS; // The final state is the state after executing the terminated() method
// CTL variables related to the sealing and unpacking methods
private static int runStateOf(int c) { return c & ~CAPACITY; } // Get the thread pool running status
private static int workerCountOf(int c) { return c & CAPACITY; } // Get the number of threads running in the thread pool
private static int ctlOf(int rs, int wc) { return rs | wc; } // Obtain the CTL object
Copy the code
public void execute(Runnable command) {
if (command == null) // If the task is empty, the NPE is thrown
throw new NullPointerException();
int c = ctl.get(); // Get the current number of worker threads and the thread pool running status (32 bits in total, the first 3 bits are running status, the last 29 bits are running threads)
if (workerCountOf(c) < corePoolSize) { // If the current number of worker threads is less than the number of core threads
if (addWorker(command, true)) // Create a worker thread in addWorker and execute the task
return;
c = ctl.get();
}
// The number of core threads is full (number of worker threads > number of core threads)
if (isRunning(c) && workQueue.offer(command)) { // If the current thread pool status is RUNNING and the task was successfully added to the blocking queue
int recheck = ctl.get(); // Double check, because the thread pool may have been in SHUTDOWN state since the last check
if (! isRunning(recheck) && remove(command)) // Delete the task from the queue if the current thread pool status is not RUNNING
reject(command); // Execute the reject policy
else if (workerCountOf(recheck) == 0) // If the workerCount in the thread pool is 0, add an addWorker to consume the work in the workQueue
addWorker(null.false);
}
// The block queue is full
else if(! addWorker(command,false)) // Try adding a worker thread to execute command
// If the thread pool is in SHUTDOWN state or is saturated
reject(command); // Execute the reject policy
}
Copy the code
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // Loop exit flag bit
for (;;) { // Infinite loop
int c = ctl.get();
int rs = runStateOf(c); // Thread pool status
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null&&! workQueue.isEmpty())// Use a more intuitive conditional statement
// (rs ! = SHUTDOWN || firstTask ! = null || workQueue.isEmpty())
)
// Return false;
// The thread pool is in STOP, TIDYING, and TERMINATED state
// (2) The thread pool status is SHUTDOWN, and the task to be executed is not empty
// (3) The thread pool status is SHUTDOWN and the task queue is empty
return false;
// Cas spin to increase the number of threads
for (;;) {
int wc = workerCountOf(c); // The current number of worker threads
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) / / worker threads number > = thread pool capacity | | worker threads number > = (core number of threads | | maximum number of threads)
return false;
if (compareAndIncrementWorkerCount(c)) // Perform the CAS operation to add the number of threads
break retry; // Add successfully, exit the outer loop
// The CAS fails to be added
c = ctl.get();
If the thread pool status changes, it will jump to the outer loop and retry to obtain the thread pool status again. If no, the inner loop will retry the CAS
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}// To summarize the above CAS process:
If the number of threads exceeds the limit, return false. If the number of threads exceeds the limit, return false
// (2) If the CAS is successful, the double loop exits. If the CAS is not successful, the CAS fails. It depends on whether the current thread pool status has changed
// (3) If yes, enter the outer loop to obtain the thread pool state again; if no, enter the inner loop again to continue cas
// The number of threads is +1, but the cas is not executed
boolean workerStarted = false; // The worker thread calls the start() method flag
boolean workerAdded = false; // The worker thread is tagged
Worker w = null;
try {
w = new Worker(firstTask); // Create a worker thread instance
final Thread t = w.thread; // Get the thread instance held by the worker thread
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock; // Use globally reentrant locks
mainLock.lock(); // Lock to control concurrency
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // Get the current thread pool status
// Thread pool status is RUNNING or (thread pool status is SHUTDOWN and there are no new tasks)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // Check whether the thread is active
throw new IllegalThreadStateException();
workers.add(w); // The thread is added to the HashSet that holds the worker thread. Workers are globally unique and are held by the mainLock
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock(); // Release the lock in the finally block
}
if (workerAdded) { // Thread added successfully
t.start(); // Call the thread's start() method
workerStarted = true; }}}finally {
if (! workerStarted) // If the thread fails to start, the addWorkerFailed method is executed
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if(w ! =null)
workers.remove(w); // If a thread fails to start, delete the previously added thread
decrementWorkerCount(); // The number of worker threads in the CTL variable is -1
tryTerminate(); // Try to transition the thread pool to TERMINATE state
} finally{ mainLock.unlock(); }}Copy the code
final void tryTerminate(a) {
for (;;) {
int c = ctl.get();
// The following cases will not enter the TERMINATED state:
// (1) The thread pool is in the RUNNING state
// (2) in the state of TIDYING or above
// (3) SHUTDOWN and the work queue is not empty
// (4) The number of active threads is not equal to 0
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if(workerCountOf(c) ! =0) { // The number of worker threads! = 0
interruptIdleWorkers(ONLY_ONE); // Interrupt a thread that is waiting for a task
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Use CAS spin to determine until the current thread pool is in TIDYING state and the number of active threads is 0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // call thread terminated()
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // Set the thread pool status to TERMINATED and the number of worker threads to 0
termination.signalAll(); // Wake up all waiting threads by calling signalAll() of the Condition interface
}
return; }}finally {
mainLock.unlock();
}
// else retry on failed CAS}}Copy the code
Worker source code interpretation
The Worker is an inner class of the ThreadPoolExecutor class, and only the most important constructors and run methods are covered here
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// The thread that the worker is running on
final Thread thread;
// The initial task to run
Runnable firstTask;
// Task counters for each thread
volatile long completedTasks;
// The constructor
Worker(Runnable firstTask) {
setState(-1); // Disable interrupts before calling runWorker()
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // Create a thread using ThreadFactory
}
// Implement the Runnable interface's run method
public void run(a) {
runWorker(this); }...// Other methods are omitted
}
Copy the code
The Worker implements the Runable interface. When the start() method is called, the run method is actually executed
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // Get the thread instance in the worker thread used to execute the task
w.firstTask = null;
w.unlock(); // Set status to 0 to allow interrupts
boolean completedAbruptly = true; // Unexpected thread termination flag
try {
// If the current task is not empty, the task is executed directly. Otherwise call getTask() to fetch a task from the task queue for execution
while(task ! =null|| (task = getTask()) ! =null) {
w.lock(); // Lock to ensure thread safety in the critical section below
// If the status value is greater than or equal to STOP and the current thread has not been interrupted, interrupt the thread actively
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();// Interrupt the current thread
try {
beforeExecute(wt, task); // The callback before the task execution, empty implementation, can be customized in the subclass
Throwable thrown = null;
try {
task.run(); // Execute the thread's run method
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // Callback after task execution, empty implementation, can be customized in the subclass}}finally {
task = null; // Set the loop variable task to null to indicate that processing is complete
w.completedTasks++; // The current number of completed tasks +1
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
Gets a task from the task queue
private Runnable getTask(a) {
boolean timedOut = false; // The timeOut variable indicates whether the idle time of the thread has expired
// Infinite loop
for (;;) {
int c = ctl.get(); // Thread pool information
int rs = runStateOf(c); // Current status of thread pool
// If the thread pool status is >=SHUTDOWN and the task column is empty or the thread pool status is >=STOP, null is returned and the current worker is destroyed
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // The number of worker threads is -1
return null;
}
int wc = workerCountOf(c); // Get the number of worker threads in the current thread pool
// Indicates whether the current thread allows timeout destruction
// Allow timeout destruction: when the thread pool allows core threads to time out or the number of worker threads > the number of core threads
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// If (the current number of threads is greater than the maximum number of threads or (timeout destruction is allowed and idle time timeout occurs))
// If the number of threads is greater than 1 or the blocking queue is empty
// Reduces the worker count and returns NULL
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// Take a task from the head of the task queue using poll or take (which blocks), depending on whether the thread allows a timeout
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r; // Return the task taken from the queue
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
To summarize the cases where getTask() returns null:
- The thread pool status is SHUTDOWN and the task queue is empty
- The thread pool status is STOP, TIDYING, or TERMINATED
- The number of threads in the thread pool exceeds the maximum number of threads
- Threads can be recycled by timeout while waiting for a new task to timeout
Worker thread exit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If completedAbruptly is true, an unhandled exception is thrown during task execution
// So the worker count has not been properly reduced, so you need to reduce the worker count once
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Add the number of completed tasks from the threads to be destroyed to the total number of completed tasks in the thread pool
completedTaskCount += w.completedTasks;
workers.remove(w); // Remove the worker thread from the worker thread collection
} finally {
mainLock.unlock();
}
// Try to terminate the thread pool
tryTerminate();
int c = ctl.get();
// In the RUNNING or SHUTDOWN state
if (runStateLessThan(c, STOP)) {
// The worker is executing normally
if(! completedAbruptly) {// The minimum number of threads is 0 if the core thread timeout is allowed, otherwise the minimum number of threads is equal to the number of core threads
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// If the blocking queue is not empty, at least one thread must continue with the remaining tasks
if (min == 0&&! workQueue.isEmpty()) min =1;
// If the current number of threads meets the minimum number of threads, there is no need to create an alternative thread
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// Create a new worker to replace the destroyed thread
addWorker(null.false); }}Copy the code
The resources
The realization principle of Java thread pool and its practice in Meituan business