Overview of Java Thread Pools -ThreadPoolExecutor
The thread pool executor automatically adjusts the thread pool size based on corePoolSize and maximumPoolSize.
When a new task is submitted in the execute(Runnable) method and less than the corePoolSize thread is running, a new thread is created to process the request, even if other worker threads are idle. If there are more threads running than corePoolSize but less than maximumPoolSize, a new thread is created only when the queue is full. You can create a fixed-size thread pool by setting corePoolSize to be the same as maximumPoolSize. By setting maximumPoolSize to a largely unbounded value, such as integer.max_value, you can allow the pool to hold any number of concurrent tasks. Typically, the core and maximum pool sizes are set only at build time, but can also be changed dynamically using setCorePoolSize and setMaximumPoolSize.
The process flow of tasks in the thread pool is shown below:
ThreadFactory
CPU Intensive – number of CPU cores +1 (C+1)
IO intensive – 2 times number of CPU cores +1) (2C+1)
I/O intensive applies to read and write operations, such as database read and write operations. CPU intensive applies to computing operations
/* * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. *1. To determine if the number of threads is smaller than the number of core threads, try adding a new thread as its first task. 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back The enqueuing if * stopped, or start a new thread if there are none. After the status is running and the task is successfully added to the work queue, check again. If the status // changes to non-running after the task is added to the queue (it is possible that the thread pool has been shutdown during execution), If we cannot queue task, then we try to add a new * thread. If it fails, We know we are shut down or saturated * and so reject the task. The thread pool is shutdown, or the pool is saturated, reject */
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// Number of worker threads < number of core threads
if (workerCountOf(c) < corePoolSize) {
// Start the new thread directly, true means that workerCount is checked again to see if it is less than corePoolSize
if (addWorker(command, true))
return;
c = ctl.get();
}
// Run and try to add the task to the queue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// Run with the maximum thread attempted
else if(! addWorker(command,false))
reject(command);
}
Copy the code
- AddWorker (command,true) : Creates a core thread to execute the task;
- AddWorker (command,false) : creates a non-core thread to execute the task;
- AddWorker (null,false) : Creates a non-core thread with no current task available
private boolean addWorker(Runnable firstTask, boolean core) {
// The first part: spin, CAS, stress CTL, etc., to determine whether the worker can be created,
// Skip the loop if possible, otherwise return false
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))// CAS increments workerCount on success
break retry;
c = ctl.get(); // re-read CTL retrieves the CTL
if(runStateOf(c) ! = rs)// State changes continue the outer loop, otherwise the inner loop
continue retry;
// else CAS failed due to workerCount change; retry inner loop}}// Part 2: Create worker, this part uses ReentrantLock
boolean workerStarted = false; // Thread start flag bit
boolean workerAdded = false; // Whether to add workers flag bit to the thread
Worker w = null;
try {
w = new Worker(firstTask); / / create the worker
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// The CTL needs to be checked after the lock has been acquired. It is possible that the last thread that acquired the lock may have changed the runState
ThreadFactory fails to be created or the thread pool shut down
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // Start the thread
workerStarted = true; }}}finally {
if (! workerStarted)
addWorkerFailed(w); // Failed operation
}
return workerStarted;
}
Copy the code
AddWorker’s work can be divided into two parts:
- The first part: atomic operation, judging whether worker can be created, judging whether to continue creation or return false through spin, CAS and CTL operations, the spin period is generally very short.
- Part 2: Create the workder synchronously and start the thread.
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Each worker has its own internal thread, null */ if ThreadFactory fails to create
final Thread thread;
/** Initializes the task, possibly null */
Runnable firstTask;
/** Number of completed tasks per worker */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // Disallow threads from being interrupted before starting
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Important execution method */
public void run(a) {
runWorker(this);
}
// state = 0; State = 1 indicates a lock
protected boolean isHeldExclusively(a) {
returngetState() ! =0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0.1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock(a) { acquire(1); }
public boolean tryLock(a) { return tryAcquire(1); }
public void unlock(a) { release(1); }
public boolean isLocked(a) { return isHeldExclusively(); }
// Interrupt started the thread
void interruptIfStarted(a) {
Thread t;
// Initialization is state = -1 and will not be interrupted
if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Copy the code
Worker implements a simple non-reentrant mutex that is easy to understand in order to prevent some control methods of the thread pool from acquiring the reentrant lock, such as the setCorePoolSize operation. Notice that the purpose of Worker locking is different from the meaning of traditional locking. The main purpose of this is to control whether a thread is interruptible and other monitoring, such as whether a thread is active (performing a task).
Copy the code
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // Allow interrupt
boolean completedAbruptly = true;
try {
// loop until task = null (thread pool closed, timeout, etc.)
// Notice the getTask() method here, where our configured blocking queue takes effect
while(task ! =null|| (task = getTask()) ! =null) {
w.lock(); // Lock it before executing the task
// If the thread pool stops, make sure the thread breaks; If not, make sure the thread is not interrupted. This requires retrieving the CTL in the second case to handle the shutdownNow race when the interrupt is cleared
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task); / / extension point
Throwable thrown = null;
try {
task.run(); // Actually execute the run method
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); / / extension point}}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // The thread exits work}}Copy the code