Why use thread pools
- The pool technology is used to reuse created threads to avoid the loss caused by frequent creation and destruction of threads, reduce resource consumption, and improve response speed.
- When a large number of tasks are started on the server, the creation of a large number of threads consumes the memory space of the server and affects the server usage. Thread pools can be used to manage threads.
- Thread pools are extensible, allowing developers to add more functionality to them
Overall thread pool design and source code parsing
Let’s take a macro look at how thread pools work
From the running mechanism of thread pool, we can see that a producer-consumer model is built inside thread pool, which decouple threads and tasks and is not directly associated with each other. In the case of a large number of tasks, tasks are temporarily stored in task queue, so as to buffer tasks well
Thread pool running status
The thread pool itself is stateful. Here are the five states of the thread pool
Running state | State description |
---|---|
RUNNING | The thread pool can receive new task submissions and can process tasks in the blocking queue normally. |
SHUTDOWN | No more tasks are submitted, and the thread pool can continue processing tasks in the blocking queue. |
STOP | New tasks are no longer accepted, and existing tasks in the blocking queue are discarded. In addition, it interrupts tasks in progress. |
TIDYING | After all tasks have executed (including tasks in the blocking queue), the number of active threads in the current thread pool is reduced to zero and the terminated method will be called. |
TERMINATED | Terminated state of the thread pool. The thread pool will enter the state when terminated |
Thread pool state flow
State transition | Mode of state transition |
---|---|
RUNNING -> SHUTDOWN | When the shutdown method of a thread pool is called, or when the Finalize method is implicitly called (which calls shutdown internally) |
RUNNING, SHUTDOWN -> STOP | When the shutdownNow method of the thread pool is called |
SHUTDOWN -> TIDYING | When both the thread pool and the blocking queue are empty |
STOP -> TIDYING | When the thread pool becomes empty |
TIDYING->TERMINATED | When the terminated method is executed |
So how does a thread pool manage its own running state and the number of threads in the pool?
// CTL: three bits indicate the running status of the thread pool, and the lower 29 bits indicate the number of threads running in the thread pool
// The advantage of holding two values in one variable is that you don't have to bother (such as locking) to maintain consistency between the two states
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Obtain the current running status of the thread pool (~ : bit-inverse, that is, 0 becomes 1 and 1 becomes 0).
private static int runStateOf(int c) { return c & ~CAPACITY; }
// Get the number of threads currently running in the thread pool
private static int workerCountOf(int c) { return c & CAPACITY; }
// Obtain the CTL from the thread pool status and the number of threads running
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code
Next comes source code parsing
Let’s take a macro look at the thread pool class inheritance
The core implementation class for thread pools in Java is ThreadPoolExecutor
Executor: Provides only an interface to execute tasks. Users do not need to worry about how to create threads or schedule threads, but only provide a Runnable object
ExecutorService: Adds interfaces to execute tasks, such as task submission, and thread pool life cycle management
AbstractExecutorService: an abstract class, will perform the task of process series up, ensure the realization of the lower need to focus on the method of a mission
ThreadPoolExecutor: Maintains its own lifecycle while simultaneously managing threads and tasks, enabling a good combination of the two to execute parallel tasks
The thread pool performs the task
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get();
// If the number of threads in the thread pool is smaller than coolPoolSize
if (workerCountOf(c) < corePoolSize) {
// Add a thread and treat the submitted thread as a firstTask
if (addWorker(command, true))
return;
Since the state of the thread pool and the number of threads running can change at any time, it is important to check the thread pool timing
c = ctl.get();
}
// This judgment is entered because the above judgment is not valid, either the corePoolSize has reached the upper limit, or adding threads failed
// The thread pool must be queued before the thread pool is queued
if (isRunning(c) && workQueue.offer(command)) {
// Obtain the value of CTL again, and make judgment every moment
int recheck = ctl.get();
// If the thread pool is not in the running state, then the subsequent remove operation will be performed, which is equivalent to a rollback to the thread executing this time
/ / remove
if (! isRunning(recheck) && remove(command))
// Execute the reject policy
reject(command);
// The thread pool is in execution state
else if (workerCountOf(recheck) == 0)
// If the worker thread is 0, create a non-core thread to prevent a situation where there is a task but no thread executing it
addWorker(null.false);
}
// Failed to create a new thread, reject it directly
else if(! addWorker(command,false))
reject(command);
}
Copy the code
Thread pool add task
// If core is true, core Poolsize will be used as the critical condition; if false, maximumPoolSize will be used as the critical condition
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// Get the thread pool running status
int rs = runStateOf(c);
// Make a decision about whether or not to create a thread
(rs >= SHUTDOWN && (rs! = SHUTDOWN || firstTask ! = null ||
// workQueue.isEmpty()))
Rs > SHUTDOWN (thread pool in STOP, TIDYING, or TERMINATED state, failed to add worker threads)
// 2.rs >= SHUTDOWN && firstTask ! = null
// 3.rs >= SHUTDOWN && workQueue.isEmpty()
// In all three cases, no new threads are created
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
// The number of threads running in the thread pool
int wc = workerCountOf(c);
// Determine whether the thread has reached the upper limit
// If a thread in corePoolSize is added, determine whether the upper limit of corePoolSize is exceeded
// If you add threads from maximumPoolSize, check whether the maximum maximumPoolSize is exceeded
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// Add the number of threads, if successful, break out of the loop
if (compareAndIncrementWorkerCount(c))
break retry;
// Get c again
c = ctl.get(); // Re-read ctl
// Whether the thread pool state is equal to the original state
if(runStateOf(c) ! = rs)// If not, the thread pool has changed and the previous operation needs to be performed again
continue retry;
// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Create a worker object
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
// ReentrantLock Exclusive lock
mainLock.lock();
try {
// Get the thread pool status again
int rs = runStateOf(ctl.get());
// Make a decision to run the thread first
// 1. The thread pool is running
// 2. The thread pool is in SHUTDOWN state but task== NULL, because new tasks are not accepted in SHUTDOWN state
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// Add worker to a hashset
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// Record the maximum number of threads.
largestPoolSize = s;
// Make a flag indicating that the worker thread is added to the hashset
workerAdded = true; }}finally {
mainLock.unlock();
}
if (workerAdded) {
// Thread of execution
t.start();
// Indicates that the worker thread has started
workerStarted = true; }}}finally {
// The thread failed to start. Do some rollback operations
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
The Worker class
The Worker is an inner class of a ThreadPoolExecutor, which is used to maintain the interrupt control status of a thread executing a task. It implements the Runnable interface and inherits AQS. Implementing Runnable interface means that the Worker is a thread. AQS is inherited for exclusive locking.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// The thread that executes the task
final Thread thread;
/** Initial task to run. Possibly null. */
// The task to perform
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// When a new thread is created, the value of state-1 is set to prevent it from being interrupted by another thread explicitly using the shutdown method before the thread is executed, because the interrupt must be determined that the state is greater than or equal to zero
setState(-1);
this.firstTask = firstTask;
// Create a new thread
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run(a) {
runWorker(this);
}
Copy the code
The Worker class has a default implementation for the way threads are created
static class DefaultThreadFactory implements ThreadFactory {
// In the system, there may be more than one thread pool, so this is a static field
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
Each thread belongs to a specific thread pool, so this field is not static
private final AtomicInteger threadNumber = new AtomicInteger(1);
private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();// Thread name prefix
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
// For threads in the thread pool, all are user threads
t.setDaemon(false);
if(t.getPriority() ! = Thread.NORM_PRIORITY)// Threads have the same priority
t.setPriority(Thread.NORM_PRIORITY);
returnt; }}Copy the code
final void runWorker(Worker w) {
// Get the current thread
Thread wt = Thread.currentThread();
// Get the task
Runnable task = w.firstTask;
// Leave the task explicitly empty to prevent a confusing problem and get a duplicate next time
w.firstTask = null;
// Set the thread state to 0 (when the Worker was created, the state was -1) to allow the thread to interrupt when running
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// Loop to determine whether the task (firstTask or task obtained from the queue) is empty
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// Check whether the thread pool is in the stop state or whether the thread is interrupted
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
// Callbacks can be extended appropriately
beforeExecute(wt, task);
Throwable thrown = null;
try {
// Execute the task
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
// The number of threads that have completed execution is used as a statistic
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}// Get the task from the blocking queue
private Runnable getTask(a) {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Get the thread pool status
int rs = runStateOf(c);
The thread pool status is STOP, TIDYING, and TERMINATED
// 2. Thread pool shutdown and queue is empty.
// If one of the above conditions is met, the number of worker threads is subtracted from wc by 1, and null is returned
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
AllowCoreThreadTimeOut Whether to allow core worker threads to time out for destruction. The default is false and can be set to true
// The number of worker threads is greater than the number of core threads
// If one of the conditions is met, timed is true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/ / 1. (a worker thread number > maximumPoolSize) | | (timed = = true && timedOut = = true)
// 2. The number of worker threads is greater than 1 or the queue is empty
Generally, this condition is triggered when the number of worker threads is greater than maximumPoolSize and the task queue is empty
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// If timed is true and no valid task is waiting for keepAliveTime, null is returned if timed is true and no valid task is waiting for keepAliveTime
// If timed is false, take() will block until there is another valid task and return null.
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
// timedOut = true when the task cannot be retrieved
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
Task queue
GetTask () has a step to get a task from a task queue. Different task queues have different access policies. The following are some optional task queues:
The name of the | describe |
---|---|
ArrayBlockingQueue | A bounded blocking queue implemented with an array, in which elements are sorted on a first-in, first-out (FIFO) basis and concurrency is controlled by a reentrant lock |
LinkedBlockingQueue | A bounded queue with a linked list structure that sorts elements on a first-in, first-out (FIFO) basis |
PriorityBlockingQueue | An unbounded queue that supports priority ordering of threads |
DelayQueue | An unbounded queue implementing PriorityBlockingQueue implements delayed fetching. When creating an element, you can specify how long to retrieve the current element from the queue |
SynchronousQueue | A blocking queue that does not store elements. Every PUT operation must wait for a take operation. Supports fair and unfair locking. Using SynchronousQueue will Executors. NewCachedThreadPool () |
LinkedTransferQueue | An unbounded blocking queue composed of a linked list structure, equivalent to other queues, LinkedTransferQueue queue has more transfer and tryTransfer methods |
LinkedBlockingDeque | A two-way blocking queue consisting of a linked list structure |
Thread pool Reclaiming
GetTask () is used to get control of the number of tasks and threads. If the thread pool should not have so many threads, it will return null. There are two places in the code where null is returned
The first place:
- The thread pool status is STOP, TIDYING, and TERMINATED
- The thread pool is shutdown and the queue is empty.
In the second place
- Worker thread number > maximumPoolSize and the task queue is empty
- Number of worker threads >corePoolSize and the task queue is empty
// completedAbruptly:true Indicates that the user exits abnormally
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If the worker thread exits unexpectedly, the number of worker threads is reduced by 1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Count the number of completed tasks
completedTaskCount += w.completedTasks;
// Remove the worker thread count
workers.remove(w);
} finally {
mainLock.unlock();
}
// Try to interrupt the idle thread
tryTerminate();
int c = ctl.get();
// If the thread pool is in the RUNNING or SHUTDOWN state
if (runStateLessThan(c, STOP)) {
// The worker thread is not abnormal
if(! completedAbruptly) {// allowCoreThreadTimeOut specifies whether core worker threads are allowed to time out for destruction
// If allowed, the minimum number of core worker threads is 0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// If the minimum number of core threads reserved is 0 and the task queue is not empty, it means that at least one more thread is needed to complete the task
if (min == 0&&! workQueue.isEmpty())// Change the minimum number of threads to 1
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// If the number of workers currently running is less than the number of workers currently needed, the addWorker is called to add a new Worker
addWorker(null.false); }}Copy the code
Thread pool rejection policy
Rejection policies | Refused to way |
---|---|
AbortPolicy | Throws a run-time exception directly |
DiscardPolicy | Silently discard the submitted task, doing nothing and throwing no exceptions |
DiscardOldestPolicy | Discard the longest-running task in the blocking queue (the queue head element) and leave a free space in the queue for the currently submitted task to be put into the queue |
CallerRunsPolicy | The submitted task is run directly by the thread that submitted it |
// The thread that submitted the task will run the submitted task directly
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy(a) {}/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(! e.isShutdown()) { r.run(); }}}// Throw an exception directly
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy(a) {}/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from "+ e.toString()); }}/** * A handler for rejected tasks that silently discards the * rejected task. */
// Silently discard the rejected task
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy(a) {}/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
// This method does nothing
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy(a) {}/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
// Remove the header element, removing the oldest mission
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// Check whether the thread pool is closed
if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code
Three ways to submit tasks in the Abstract Class AbstractExecutorService
There are three ways to submit, and either way, the end result is to convert the incoming task into a Callable object for processing, and then invoke it
The execute method declared in the Executor interface is handled uniformly
// All three methods of submitting tasks are similar
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public FutureTask(Runnable runnable, V result) {
// The Runnable object is converted to a Callable object
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call(a) {
task.run();
// Note that the result returned is null, passed in by layers
returnresult; }}Copy the code
test
public static void main(String[] args) {
ExecutorService executors = new ThreadPoolExecutor(2.5.10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(4));
IntStream.range(0.10).forEach(i -> {
executors.submit(() -> {
IntStream.range(0.50).forEach(j -> System.out.println(Thread.currentThread().getName()));
});
});
executors.shutdown();
}
Copy the code
The resources
Blog.csdn.net/bieber007/a…
Tech.meituan.com/2020/04/02/…