Welcome to the public account: Wuxiaozhu
What is a thread pool
In our application, a Thread is created and executed to execute a task through the method of new Thread().start(). After the execution, the Thread is closed. During the whole process, it takes time to create and close the Thread.
So, to reduce the overhead of frequently creating and closing threads. We can reuse the created threads. As with database connection pooling, we need to establish and destroy connections when performing database queries. To avoid the overhead of connection and destruction, we can use database connection pooling (c3P0,druid, etc.) to manage database connections. The same is true of a thread pool. In a “pool”, there are available threads. When a thread is needed to perform a task, it is directly fetched from the thread pool.
Implementation of thread pools
Here is a UML class diagram of ThreadPoolExecutor.
The top-level interface isExecutor
There’s only one method execute()
Public interface Executor {// Define the execute method to execute the task void execute(Runnable command); }Copy the code
The ExecutorService interface
Executor has been extended to enrich the ability to execute and manage tasks. Provides methods to manage thread pools, such as stopping them from running; It also provides methods to generate futuresfor one or more asynchronous tasks,
// Close the thread pool void shutdown(); Boolean isShutdown(); Boolean isTerminated(); // Whichever occurs first, the request closes, a timeout occurs, or the current thread interrupts, blocking until all tasks are completed. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // Submit a task with a Runnable return value for execution and return a Future representing the task. <T> Future<T> submit(Callable<T> task); // Submit the task Future<? > submit(Runnable task); // Perform a given task, and when all tasks are complete, return a list of futures that hold the status and result of the task. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // Executes a given task, and returns the result if a task completed successfully (that is, no exception was thrown). The remaining unfinished tasks will be cancelled <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;Copy the code
AbstractExecutorServiceAn abstract class
The ExecutorService interface implements some methods such as submit, invokeAny, invokeAll, etc., so that when using the thread pool, you only need to focus on the implementation of the task and submit the task to the thread pool for processing. The execute method is not implemented and is left to ThreadPoolExecutor
The implementation classThreadPoolExecutor
This is the core thread pool class, which maintains the thread pool lifecycle and core parameters. More on that below.
ForkJoinPool
ForkJoinPool is a divide-and-conquer framework provided by Java7 that enables parallel computation of a task by breaking it into smaller tasks and combining the results of smaller tasks into a single result.
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor inherited ScheduledThreadPoolExecutor, by implementing ScheduledExecutorService interface, to the extension of ThreadPoolExecutor, Asynchronous tasks can be executed after a delay or periodically.
Thread pool core ThreadPoolExecutor
Status of the thread pool
The thread pool maintains its own life cycle internally, which combines the maintenance of two key parameters, health status (RS) and number of threads (WC), as represented by a 32-bit atomic integer. The higher 3 bits represent RS, and the lower 29 bits represent WC
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //32-3=29 private static final int COUNT_BITS = Integer.SIZE - 3; Private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; Private static int runStateOf(int c) {return c & ~CAPACITY; private static int runStateOf(int c); Private static int workerCountOf(int c) {return c & CAPACITY; // Count the number of threads in the pool. } private static int ctlOf(int rs, int wc) { return rs | wc; }Copy the code
state | describe |
---|---|
RUNNING | Accept the new task and process the tasks in the workQueue |
SHUTDOWN | New task submissions are not accepted, but tasks in the workQueue are processed |
STOP | Does not accept new task submissions, does not process tasks in the workQueue, and interrupts executing tasks |
TIDYING | All tasks are terminated, and workCount is 0 |
TERMINATED | This is the state of the thread pool after the terminated() method |
-
RUNNING -> SHUTDOWN: This state transition occurs when SHUTDOWN () is called
-
RUNNING -> STOP call shutdownNow()
-
SHUTDOWN -> TIDYING: When the task queue and thread pool are empty.
-
STOP -> TIDYING: This transition occurs when the number of worker threads in the thread pool is 0 after the task queue is empty
-
TIDYING -> TERMINATED: after the TERMINATED () method is executed
The core parameters of the thread pool
parameter | describe |
---|---|
corePoolSize | Core threads |
maximumPoolSize | Maximum number of threads, the maximum number of threads allowed to be created by the thread pool |
workQueue | A queue of tasks. This queue only holds Runnable tasks submitted by the execute method |
keepAliveTime | The idle time of threads in the thread pool. |
unit | KeepAliveTime Indicates the time unit of the parameter. |
threadFactory | The factory used by the execution program to create new threads. |
handler | Handler used when execution is blocked due to exceeding thread scope and queue capacity. |
Thread pool running rules
- If the number of threads is < corePoolSize, a thread is created and started to execute the newly submitted task
- If the number of threads >= corePoolSize && number of threads
- If the workQueue is full and the number of threads is
- If the workQueue is full and the number of threads >=maximumPoolSize, the task is rejected.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// If the number of threads
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// If thread count >= corePoolSize and thread pool is in RUNNING state, add the task to workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// If the thread pool is no longer RUNNING, the queued task is removed and the policy is rejected
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// If the workQueue is full and the number of threads is
// If this fails, the number of threads has reached maximumPoolSize
else if(! addWorker(command,false))
reject(command);
}
Copy the code
Creating a thread pool
Thread pools can be created using factory methods provided by the Executors class.
- Executors.newWorkStealingPool
Create a thread pool that holds enough threads to support a given degree of parallelism. Create ForkJoinPool. Java8 introduction.
public static ExecutorService newWorkStealingPool(a) {
return new ForkJoinPool
// Runtime.getruntime (). AvailableProcessors gets the number of cpus as the parallelism of the thread pool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null.true);
}
Copy the code
-
Executors. NewCachedThreadPool needed to create a new thread, at the same time can reuse the previously created thread thread pool
-
Executors. NewScheduledThreadPool support regular and periodic execution thread pool, and the difference between newCachedThreadPool threads it not recycling
-
Executors. NewSingleThreadExecutor generate single thread thread pool
public static ExecutorService newSingleThreadExecutor(a) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Copy the code
- Executors. NewFixedThreadPool a fixed-size thread pool
, the number of core threads is equal to the maximum number of threads, there are no idle threads. KeepAliveTime = 0.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Copy the code
Thread pool rejection policy
When the task cache queue in the thread pool is full and the number of threads in the thread pool reaches maximumPoolSize, the task needs to be rejected.
- AbortPolicy thread pool strategy of default refused to discard task, throw RejectedExecutionException.
- DiscardPolicy Discards tasks without throwing exceptions
- DiscardOldestPolicy Discards the earliest task in the task queue and resubmits the current task
- CallerRunsPolicy As long as the thread pool is not closed, the thread submitting the task performs the task itself.
Hook method
The addWorker method adds a thread, uses new Worker(firstTask) to create a Worker, and then uses start () to execute it. In fact, runWorker() is called.
runWorker(Worker w)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
//help gc
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
/ / lock the worker
w.lock();
// Thread pool stop, but the thread did not reach the interrupt state, help thread interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
Execute the before hook function
beforeExecute(wt, task);
Throwable thrown = null;
try {
// Execute the task
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// Execute after hook functionafterExecute(task, thrown); }}finally {
// The task is completed
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
Copy the code
In the runWorker() method, ThreadPoolExecutor provides two hook functions beforeExecute(wt, task) and afterExecute(task, thrown); There is also a terminated(). We can use these hook methods to extend our thread pool.
public class ExecutorDemo {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5.5.0l,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("Commence execution:" + r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("Execution completed:" + r);
}
@Override
protected void terminated(a) {
System.out.println("Thread Pool terminated!"); }}; executorService.submit(() -> { System.out.println("Task Running!!!"); }); executorService.shutdown(); }} execution results begin to execute: Java. Util. Concurrent. FutureTask @ 212 aa004 Task Running!!!!!! Perform complete: Java. Util. Concurrent. FutureTask @ 212 aa004 Thread Pool terminated!Copy the code
Considerations for using thread pools
- The size of the thread pool has an impact on the performance of your application. Therefore, thread pool parameters need to be set properly
- FixedThreadPool and SingleThreadPool allow a request queue length of integer. MAX_VALUE, which may result in a large number of requests, resulting in OOM. CachedThreadPool and ScheduledThreadPool
The number of threads allowed to be created is integer.max_value, which may create a large number of threads, resulting in OOM. For these reasons, the Alibaba Java Development Manual forces thread pools to be created by ThreadPoolExecutor instead of using Executors.