preface
JUC (java.util.concurrent) is a tool for processing threads. This article discusses the thread pool class ThreadPoolExecutor, which is used frequently in the process of using thread pools. This class also requires more in-depth learning
1. Common parameter description
Take a look at a constructor of the ThreadPoolExecutor class
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Copy the code
1.1 Core Parameters:
- The number of threads defines the minimum number of threads that can run at the same time.
- MaximumPoolSize: When the number of tasks in the queue reaches the queue capacity, the number of threads that can run simultaneously becomes the maximum number of threads.
- WorkQueue: When a new task arrives, the system determines whether the number of threads currently running has reached the core number. If so, the new task is placed in the queue.
Processing logic after a task is submitted to the thread pool:
- Whether the thread pool first has fewer threads currently running
corePoolSize
If so, a new worker thread is created to execute the task. - If there are no free threads to perform the task and the current number of threads is equal to
corePoolSize
At the same timeBlockingQueue
If not, add the taskBlockingQueue
Without adding a new thread. - If there are no free threads to perform the task and the blocking queue is full and the number of threads in the pool is less than
maximumPoolSize
, a new thread is created to execute the task. - If there are no free threads to perform the task and the blocking queue is full and the number of threads in the pool is equal to
maximumPoolSize
, according to the constructorhandler
Specifies a policy to reject new tasks.
The other:
- The thread pool does not mark which thread is core and which is non-core. The thread pool only cares about the number of core threads.
- when
ThreadPoolExecutor
When creating a new thread, passCAS
To update the state of the thread pool
1.2 Other Common Parameters:
-
KeepAliveTime: When the number of threads in the thread pool is greater than corePoolSize, if no new task is submitted at this time, the threads outside the core thread will not be destroyed immediately, but will wait until the keepAliveTime is exceeded before being recycled for destruction
-
Unit: keepAliveTime Time unit
-
ThreadFactory: Used when executor creates a new thread
-
Handle: saturation policy
ThreadPoolExecutor saturation policy definition
ThreadPoolExecutor.AbortPolicy
: throwRejectedExecutionException
To reject new tasksThreadPoolExecutor.CallerRunsPolicy
: Performed by the thread that submitted the task to the thread poolThreadPoolExecutor.DiscardPolicy
: Discard new tasks without processing themThreadPoolExecutor.DiscardOldestPolicy
: Discards the first submitted but unprocessed task
2. ThreadPoolExecutor example
Create a task of type Runnable
public class RunnableTask implements Runnable {
// Identifies the task being run by the current thread
int number;
public ThreadTask(int number) {
this.number = number;
}
@Override
public void run(a) {
try {
System.out.println(Thread.currentThread().getName() + " Start -- " + number);
Thread.sleep(10000);
System.out.println(Thread.currentThread().getName() + " End -- " + number);
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
Use the ThreadPoolExecutor thread pool to run tasks
public class ThreadPoolTest {
/** The thread pool corresponds to the argument */
private static final Integer corePoolSize = 4;
private static final Integer maximumPoolSize = 8;
private static final Long keepAliveTime = (long) 8;
private static final TimeUnit timeUnit = TimeUnit.SECONDS;
private static final BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(16);
private static final RejectedExecutionHandler handle = new ThreadPoolExecutor.AbortPolicy();
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, handle);
for (int i = 0; i < 4; i++) {
RunnableTask runnableTask = new RunnableTask(i);
threadPoolExecutor.execute(threadTask);
}
threadPoolExecutor.shutdown();
while(! threadPoolExecutor.isTerminated()) { } System.out.println("Finished all threads"); }}Copy the code
Example creates a fixed size for the four thread pool, and then assign the five working, because the thread pool size to four, it will start the four line Mr. Cheng four processing work, other work is in a wait state, once the work is done, free worker threads will pick up the other work performed in the waiting queue. These four threads will continue to exist until the thread pool dies.
Here is the output of the program:
pool-1-thread-4 Start -- 3
pool-1-thread-1 Start -- 0
pool-1-thread-2 Start -- 1
pool-1-thread-3 Start -- 2
pool-1-thread-4 End -- 3
pool-1-thread-1 End -- 0
pool-1-thread-4 Start -- 4
pool-1-thread-2 End -- 1
pool-1-thread-3 End -- 2
pool-1-thread-4 End -- 4
Finished all threads
Copy the code
Not only can we specify the number of active threads when creating an instance of ThreadPoolExecutor, we can also limit the size of the thread pool and create our own RejectedExecutionHandler implementation to handle rejected tasks
Custom rejection handler:
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "Rejected."); }}Copy the code
Change the thread pool parameter in ThreadPooltest.java slightly to reduce the blocking queue to 1. Since the maximum thread pool is 8, the blocking queue can only hold one task, and when 10 tasks are submitted, one of them will be rejected by the custom saturation policy
public class ThreadPoolTest {
/** The thread pool corresponds to the argument */
private static final Integer corePoolSize = 4;
private static final Integer maximumPoolSize = 8;
private static final Long keepAliveTime = (long) 8;
private static final TimeUnit timeUnit = TimeUnit.SECONDS;
/** reduces blocking queues */
private static final BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1);
/** set to your own implementation */
private static final RejectedExecutionHandler handle = new RejectedExecutionHandlerImpl();
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, handle);
// Submit ten tasks
for (int i = 0; i < 10; i++) {
ThreadTask threadTask = new ThreadTask(i);
threadPoolExecutor.execute(threadTask);
}
threadPoolExecutor.shutdown();
while(! threadPoolExecutor.isTerminated()) { } System.out.println("Finished all threads"); }}Copy the code
Execution Result:
Cn. Testproject. Thread. Runnable. ThreadTask @ e2f02 was refused the pool - 4571-thread-1 Start -- 0
pool-1-thread-3 Start -- 2
pool-1-thread-5 Start -- 5
pool-1-thread-7 Start -- 7
pool-1-thread-4 Start -- 3
pool-1-thread-2 Start -- 1
pool-1-thread-8 Start -- 8
pool-1-thread-6 Start -- 6
pool-1-thread-4 End -- 3
pool-1-thread-3 End -- 2
pool-1-thread-1 End -- 0
pool-1-thread-7 End -- 7
pool-1-thread-8 End -- 8
pool-1-thread-5 End -- 5
pool-1-thread-4 Start -- 4
pool-1-thread-6 End -- 6
pool-1-thread-2 End -- 1
pool-1-thread-4 End -- 4
Finished all threads
Copy the code
3. Use the ThreadPoolExecutor constructor to create a thread pool is recommended
In the “Concurrent Processing” section of the Alibaba Java Development Manual, it’s clear that thread pools are not allowed to be created by Executors, but by ThreadPoolExecutor. In this way, students can be more clear about the running rules of the thread pool and avoid the risk of resource exhaustion
The number of threads allowed to be created in the thread pool returned by Executors is Integer.MAX_VALUE, which may create a large number of threads and result in OOM.
4. Thread pool Settings Set the appropriate size
Here’s a simple formula that works for a wide range of applications:
- CPU intensive task (N+1) : This type of task consumes CPU resources. The number of threads can be set to N (number of CPU cores) +1. One more thread is used to prevent occasional page miss interrupts or pauses caused by other reasons. Once the task is paused, the CPU is idle, and in this case the extra thread can take full advantage of the idle CPU time.
- I/ O-intensive task (2N) : This is a task in which the system spends most of its time processing I/O interactions, and the thread does not occupy CPU for the time it is processing I/O, so the CPU can be handed over to another thread. Therefore, in the application of I/O intensive tasks, we can configure more threads, the specific calculation method is 2N.
How to determine whether a CPU – intensive task or an IO – intensive task?
CPU intensive is simply a task that uses the computing power of the CPU such as sorting large amounts of data in memory. When it comes to network reading alone, file reading is IO intensive. The characteristic of this type of task is that the CPU computation time is very small compared to the time spent waiting for THE I/O operation to complete. Most of the time is spent waiting for the I/O operation to complete.
5. How are tasks closed in the thread pool?
The thread pool shuts down the task using the shutdown() and shutdownNow() methods
-
ShutdownNow () : The thread pool rejects newly submitted tasks and immediately closes the thread pool. Tasks in the thread pool are no longer executed
-
Shutdown () : The thread pool rejects newly submitted tasks and closes the thread pool while waiting for all tasks in the thread pool to complete