1. Introduction to thread pools
1.1 Why use thread pools
- What’s the downside of using threads if they don’t apply to thread pools?
- Creating threads repeatedly is expensive
- Too many threads take up too much memory
- The benefits of using thread pools
- Speed up response times without repeatedly creating and destroying threads
- CPU and memory can be used wisely and controlled by thread pools
- Unified resource management, when there are many tasks, we can use thread pools to unify operations.
1.2 Applicable to thread pools
- The server receives a large number of requests, so we can reuse threads through thread pools to reduce the overhead of thread creation and destruction.
- In practice we recommend that threads be managed by thread pools.
2. Details about thread pool parameters
2.1 Parameter list of the thread pool
The parameter name | type | explain |
corePoolSize | int | Core threads |
maxPoolSize | int | Maximum number of threads |
keepAliveTime | long | Survival time |
workQueue | BlockingQueue | Task storage queue |
threadFactory | ThreadFactory | The thread pool creates the factory class for the new thread |
Handler | RejectedExecutionHandler | Reject policy for task execution |
- CorePoolSize: After the thread pool is initialized, there are no threads in the default thread pool. When a task arrives, a new thread is created to execute the task until corePoolSize is full.
- MaxPoolSize: specifies the maximum number of threads that can be added to the thread pool
2.2 corePoolSize and maxPoolSize control the process of adding threads:
- If the number of threads is smaller than corePoolSize, a new thread will be created to run the task even if other worker threads are idle
- If the number of threads is equal to or greater than corePoolSize but less than maxPoolSize, it is placed in the BlockingQueue.
- If the queue is full and the thread is smaller than maxPoolSize, a new thread is created to run the task
- If the queue is full and the number of threads reaches maxPoolSize, the reject policy is executed
Characteristics of thread pool increase and decrease
If corePoolSize and maxPoolSize are set to the same, the thread pool size is fixed
The thread pool wants to keep the number of threads small and will only add threads if the queue is full
If maxPoolSize is set to a high value, such as integer.max_value, the thread can be allowed to hold any number of concurrent tasks since this value is almost never reached
Since a thread larger than corePoolSize is created only when the queue is full, if you use an unbounded queue such as LinkedBlockingQueue, the number of threads will not exceed corePoolSize
2.2 KeepAliveTime KeepAliveTime
- If the thread pool currently has more threads than
, the extra threads when idle time exceedsKeepAliveTime
“Will be terminated. - If you don’t pass
Set up theallowCoreThreadTimeOut
The thread is not terminated.
2.3 threadFactory a threadFactory
- The new thread is created by
To create. Executors
Use the defaultExecutors.defaultThreadFactory()
Create and cut all created threads are in the same thread group, thread priority isThread.NORM_PRIORITY
Which is 5, not the daemon thread.- If you specify a ThreadFactory, you can specify the thread name, thread group, priority, daemon thread, and so on
2.4 workQueue workQueue
- Common queues:
- Direct exchange:
, which can contain only a queue of one element. The thread that inserts an element into the queue blocks until another thread retrieves the stored element from the queue. Similarly, if a thread tries to fetch an element and none currently exists, the thread will block until it inserts the element into the queue. - Unbounded queue:
Because it’s a linked list structure, it won’t fill up. If I set it up like this,maxPoolSize
It would be ineffective, but it would be easy to do if the number of missions soaredOOM
the - Bounded queue: ArrayBlockingQueue, which is the queue that has the maximum value if the queue is full and
Is greater thancorePoolSize
A new thread is created to perform the task.
- Direct exchange:
3. Create a thread pool instance
3.1 the use ofExecutors
Creating a thread pool
- FixedThreadPool
* FixedThreadPool
* @author yiren
public class FixedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for(int i = 0; i < 1000; i++) { executorService.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); }}}Copy the code
. pool-1-thread-4 pool-1-thread-2 pool-1-thread-4 pool-1-thread-1 pool-1-thread-3 pool-1-thread-2 pool-1-thread-1 pool-1-thread-4 pool-1-thread-3 ...Copy the code
- We can see that the console is always only used by 4 threads back and forth
- Let’s look at the source code
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
new LinkedBlockingQueue<Runnable>());
Copy the code
- What’s actually created in there is
And then the parameter ofcore
Set to the same value, and the work queue is unbounded. So it will not create more thancorePoolSize
Number of threads. - If the queue is too long, OOM will be generated
* FixedThreadPool OOM
* -Xmx8m -Xms8m
* @author yiren
public class FixedThreadPoolOom {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Runnable runnable = () -> {
try {
} catch(InterruptedException e) { e.printStackTrace(); }};for (int i = 0; i < Integer.MAX_VALUE; i++) { executorService.execute(runnable); }}}Copy the code
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.imyiren.concurrency.threadpool.FixedThreadPoolOom.main(FixedThreadPoolOom.java:23)
Copy the code
- SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(a) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
Copy the code
- You can see that in the method,
Both are 1, a single-threaded thread pool, and its blocking queue is also a linked list of unbounded queues. Is equivalent tonewFixedThreadPool(1)
I won’t do it here.
- CachedThreadPool
public static ExecutorService newCachedThreadPool(a) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
Copy the code
The core of phi is zero,max
And the task force is listed as a direct exchange queue, so the number of threads to create as many threads, and this thread task use after the end, will not immediately terminate, will wait60s
, do a cache processing, improve the utilization rate, obsolete will not use their destruction. Due to themax
The number of threads is particularly large and easy to OOM
- ScheduledThreadPool
- Perform tasks periodically
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
Copy the code
- We can see from the above source code,
Just specify the number of core threads and create oneThreadPoolExecutor
A subclass ofScheduledThreadPoolExecutor
And its work queue is a delay queue. - So let’s see, how do we use it
* ScheduledThreadPool
* @author yiren
public class ScheduledThreadPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService.schedule(() -> System.out.println(LocalDateTime.now() + ""+ Thread.currentThread().getName() + " delay 5s"), 5, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println(LocalDateTime.now() + ""+ Thread.currentThread().getName()), 1.3, TimeUnit.SECONDS); }}Copy the code
2020-02-16T18:35:21.598 2020-02-16T18:35:22.606 Pool-1-thread -1 2020-02-16T18:35:25 2020-02-16T18:35:31.606 DELAY 5s 2020-02-16T18:35:31.607 Delay 5s 2020-02-16T18:35:31.608 The 2020-02-16 - thread pool - 1-1 T18: ". The 610 - thread pool - 1-1Copy the code
- The first usage is:
schedule(Runnable, long, TimeUnit);
, how long to delay execution, the unit of delay time. - The second usage is:
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
, the time of initial execution delayinitialDelay
And then every otherperiod
Execute once and specify the time unit.
- workStealingPool
public static ExecutorService newWorkStealingPool(a) {
return new ForkJoinPool
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
"ForkJoinPool-" + nextPoolId() + "-worker-");
Copy the code
- JDK1.8 adds a new thread pool that we can see is not used
, but the new thread pool classForkJoinPool
It is suitable for time-consuming tasks because of its reasonable use of CPU for parallel operation. ForkJoinPool
It is a parallel thread pool, and the number of concurrent threads is passed in as a parameter. There is a significant difference between the previous four thread pools, which all have core threads, maximum threads, etc., and this uses the number of concurrent threads to solve the problem. This thread poolThe order of tasks is not guaranteedIt’s called WorkStealing, stealing.
Through the above we understand the characteristics of each thread. And internal parameters. Using Executors often doesn’t fit easily into our business needs. In Ali’s Java code convention, it is clearly stated as follows: (from IDEA hint) :
- Do not use Executors to create a thread pool. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion. 1) FixedThreadPool and SingleThreadPool: The allowed request queue length is Integer.MAX_VALUE, which may accumulate a large number of requests and result in OOM. 2) CachedThreadPool: the number of threads allowed to create is integer. MAX_VALUE, which may create a large number of threads, resulting in OOM.
3.2 the use ofThreadPoolExecutor
Creating a thread pool
If using ThreadPoolExecutor, how do we set the number of threads is a problem
- CPU intensive (encryption, hash, etc.) : The optimal number of threads is 1-2 times the number of CPU cores
- Time consuming IO type (read and write database, file, network flow, etc.) : the optimal number of threads is usually many times greater than the number of CPU cores, based on the BUSY state of JVM threads monitoring display, to ensure that idle threads can be connected, refer to Brain Goetz recommended calculation method:
- Number of threads = number of CPU cores * (1 + Average waiting time/Average working time)
- A more accurate way is to do a pressure test
Recommended method for creating a thread pool using ThreadPoolExecutor (from ali Java plug-in)
// Positive example 1: //org.apache.commons.lang3.concurrent.BasicThreadFactory ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1.new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()); Copy the code
/ / Positive example 2: ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); //Common Thread Pool ExecutorService pool = new ThreadPoolExecutor(5.200.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.execute(()-> System.out.println(Thread.currentThread().getName())); pool.shutdown();//gracefully shutdown Copy the code
Positive example 3:<bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="10" /> <property name="maxPoolSize" value="100" /> <property name="queueCapacity" value="2000" /> <property name="threadFactory" value= threadFactory /> <property name="rejectedExecutionHandler"> <ref local="rejectedExecutionHandler" /> </property> </bean> //in code userThreadPool.execute(thread); Copy the code
4. Method of stopping the thread pool
4.1 shutdown()
- Once this method is executed, it tells the thread pool to stop, but it does not stop immediately. The thread pool will stop only after executing the current task and the tasks in the queue.
- During this period, no new tasks will be accepted and an error will be reported if a new task is submitted
4.2 isShutdown()
- If we don’t know if the thread is in
, we can do this by callingisShutdown()
To judge. Pay attention to thisisShutdown()
Is to determine whether it is calledshutdown
() method, not means to stop completely.
4.3 isTerminated()
- Do we need to determine if the thread pool has stopped completely?
4.4 awaitTermination(timeout, TimeUnit);
- This method is used to block and wait a certain amount of time to see if it has stopped completely
4.5 case:
/ * * *@author yiren
public class ThreadPoolShutdown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Runnable runnable = () -> {
try {
} catch (InterruptedException e) {
for (int i = 0; i < 50; i++) {
System.out.println("= = = = = = > shutdown!");
try {
executorService.execute(() -> {
System.out.println("new Task-1");
} catch (Exception e) {
boolean isTerminated = executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("executorService.awaitTermination(3, TimeUnit.SECONDS) = " + isTerminated);
if (executorService.isShutdown()) {
System.out.println("Thread has entered the closed phase and cannot commit.");
} else {
System.out.println("new Task-2");
System.out.println("executorService.isTerminated() = "+ executorService.isTerminated()); }}Copy the code
pool-1-thread-2 ...... - the thread pool - 1-3 = = = = = = > shutdown! The thread has entered the shutdown phase and cannot commit pool-1-thread-1...... pool-1-thread-2 java.util.concurrent.RejectedExecutionException: Task com.imyiren.concurrency.threadpool.ThreadPoolShutdown$$Lambda$2/1072408673@5b480cf9 rejected from java.util.concurrent.ThreadPoolExecutor@6f496d9f[Shutting down, pool size = 5, active threads = 5, queued tasks = 35, completed tasks = 10]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.imyiren.concurrency.threadpool.ThreadPoolShutdown.main(ThreadPoolShutdown.java:28)
executorService.isTerminated() = true
Copy the code
4.6 shutdownNow()
All threads are stopped by an interrupt signal, and thread tasks in the concurrent work queue are returned as a Runnable list
/ * * *@author yiren
public class ThreadPoolShutdownNow {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Runnable runnable = () -> {
try {
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " Interrupted"); }};for (int i = 0; i < 50; i++) {
List<Runnable> runnableList = executorService.shutdownNow();
System.out.println("runnableList.size() = "+ runnableList.size()); }}Copy the code
pool-1-thread-1 Interrupted
pool-1-thread-4 Interrupted
pool-1-thread-2 Interrupted
pool-1-thread-3 Interrupted
runnableList.size() = 35
pool-1-thread-5 Interrupted
Process finished with exit code 0
Copy the code
5. How do I reject threaded tasks
5.1 Rejection Opportunity
- Submitting new tasks will be rejected when Executor is down
- When the Executor’s maximum thread and work queue, they use a finite size and have reached the maximum
5.2 Rejection Policy
AbortPolicy throws an exception
DiscardPolicy silently discards, you can’t get notified
DiscardOldestPolicy Silently discards the oldest
CallerRunsPolicy lets the thread submitting the task execute
6. Use cycle functions to customize threads
6.1 Paused thread pools
/** * Demo task execution cycle before and after task execution **@author yiren
public class CanPauseThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private final Lock lock = new ReentrantLock();
private Condition unPaused = lock.newCondition();
public CanPauseThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
private void pause(a) {
try {
isPaused = true;
} finally{ lock.unlock(); }}@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
try {
while(isPaused) { unPaused.await(); }}catch (InterruptedException e) {
}finally{ lock.unlock(); }}public void resume(a) {
try {
isPaused = false;
}finally{ lock.unlock(); }}public static void main(String[] args) throws InterruptedException {
CanPauseThreadPool canPauseThreadPool = new CanPauseThreadPool(4.10.10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024));
for (int i = 0; i < 100; i++) {
try {
System.out.println(Thread.currentThread().getName() + "" + LocalDateTime.now());
} catch(InterruptedException e) { e.printStackTrace(); }}); } TimeUnit.SECONDS.sleep(1);
// Start a pause
System.out.println("====> Start pause" + LocalDateTime.now());
System.out.println("====> Restore thread pool"+ LocalDateTime.now()); canPauseThreadPool.resume(); }}Copy the code
220-220-02-16T19:58:56.685 220-220-02-16T19:58:56.787 220-220-02-16T19:58:56.685 220-220-02-16T19:58:56.685 220-220-02-16T19:58:56.790 220-220-02-16T19:58:56.790 220-220-02-16T19:58:56.790 2020-02-16T19:58:56.894 Pool -1 thread-1 2020-02-16T19:58:56.894 pool-1 thread-1 2020-02-16T19:58:56.894 2020-02-16T19:58:56.894 ====> Start stop 2020-02-16T19:58:56.996 2020-02-16T19:58:56.997 Pool-1: Thread -1 2020-02-16T19:58:56.997 Pool-2: thread-2 2020-02-16T19:58:56 ====> Restore thread pool 2020-02-16T19:58:58.955 pool-1-thread-1 2020-02-16T19:58:59.056 Pool-1-thread-3 2020-02-16T19:58:59.056 2020-02-16T19:58:59.056 Pool -1 thread-1 2020-02-16T19:58:59.056 Pool -1 thread-2 2020-02-16T19:58:59.056 pool-1 thread-2 2020-02-16T19:58:59.056 - the thread pool - 1-2 2020-02-16 T19:58:59. The 157 - thread pool - 1-4, 2020-02-16 T19:58:59. 157Copy the code
- As we can see from the above example, we can override the cycle function of the thread pool to stop the execution of the task before the thread pool executes it.
6.2 afterExecute
- In addition to the above
, thread pools also provideafterExecute
7. Simple analysis of thread pools
7.1 the diagram
We can use the Diagrams tool of IDEA to select these interfaces and classes and display the diagram above.
An Executor has only one method to execute a task
public interface Executor { void execute(Runnable command); } Copy the code
ExecutorService inherits from Executor and includes several management methods
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<? > submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }Copy the code
AbstractExecutorService and ThreadPoolExecutor are examples of this
Executors is a tool class that AIDS in creating threads, etc.
7.2 Composition of a thread pool
It is mainly composed of the following components:
Thread pool manager
The worker thread
Task queue
Task interface
7.3 Reusing tasks by thread Pools
- First let’s look at the execute(Runnable) method
public void execute(Runnable command) {
if (command== null) throw new NullPointerException(); int c = ctl.get(); // Add core threadsif (workerCountOf(c) < corePoolSize) {
if (addWorker(command.true))
return; c = ctl.get(); } // Add to queueif (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
else if (workerCountOf(recheck) == 0)
addWorker(null, false); } // Execute the reject policyelse if(! addWorker(command.false))
Copy the code
- Once we’ve done the task, the thread will encapsulate into one
In thread reuse, the run method of fixed threads is used to continuously check whether there is a task in the queue and execute it if there is one. The main isWorker
The inside of therunWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
- We can see that we take the Task and call the run method. It also covers the periodic functions before and after above
8. Thread pool status
- RUNNING: Can accept new tasks and handle queuing tasks
- SHUTDOWN: does not accept new tasks, but can process queueing tasks
- STOP: does not accept new tasks, does not process queued tasks, and interrupts ongoing tasks
- TIDYING: All threads have terminated, and when workerCount is zero, the thread transitions to TIDYING state and is called
methods - TERMINATED:
Operation to complete
- Article content source:
- Java concurrent programming art, JDK1.8 version of the source code, MOOC wukong JUC course
- Think you can point a thumbs-up
About me
- Majoring in Computer Science and technology, general university, Hangzhou.
- Graduated in 20 years, mainly engaged in back-end development of Java technology stack.
- GitHub: github.com/imyiren
- Blog : imyi.ren