preface

JUC (java.util.concurrent) is a tool for processing threads. This article discusses the thread pool class ThreadPoolExecutor, which is used frequently in the process of using thread pools. This class also requires more in-depth learning


1. Common parameter description

Take a look at a constructor of the ThreadPoolExecutor class

public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize, long keepAliveTime, 
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory, 
                          RejectedExecutionHandler handler)
Copy the code


1.1 Core Parameters:

  • The number of threads defines the minimum number of threads that can run at the same time.
  • MaximumPoolSize: When the number of tasks in the queue reaches the queue capacity, the number of threads that can run simultaneously becomes the maximum number of threads.
  • WorkQueue: When a new task arrives, the system determines whether the number of threads currently running has reached the core number. If so, the new task is placed in the queue.


Processing logic after a task is submitted to the thread pool:

  1. Whether the thread pool first has fewer threads currently runningcorePoolSizeIf so, a new worker thread is created to execute the task.
  2. If there are no free threads to perform the task and the current number of threads is equal tocorePoolSize At the same timeBlockingQueue If not, add the taskBlockingQueueWithout adding a new thread.
  3. If there are no free threads to perform the task and the blocking queue is full and the number of threads in the pool is less thanmaximumPoolSize, a new thread is created to execute the task.
  4. If there are no free threads to perform the task and the blocking queue is full and the number of threads in the pool is equal tomaximumPoolSize, according to the constructorhandler Specifies a policy to reject new tasks.

The other:

  1. The thread pool does not mark which thread is core and which is non-core. The thread pool only cares about the number of core threads.
  2. whenThreadPoolExecutorWhen creating a new thread, passCASTo update the state of the thread pool


1.2 Other Common Parameters:

  • KeepAliveTime: When the number of threads in the thread pool is greater than corePoolSize, if no new task is submitted at this time, the threads outside the core thread will not be destroyed immediately, but will wait until the keepAliveTime is exceeded before being recycled for destruction

  • Unit: keepAliveTime Time unit

  • ThreadFactory: Used when executor creates a new thread

  • Handle: saturation policy


ThreadPoolExecutor saturation policy definition

  • ThreadPoolExecutor.AbortPolicy: throwRejectedExecutionExceptionTo reject new tasks
  • ThreadPoolExecutor.CallerRunsPolicy: Performed by the thread that submitted the task to the thread pool
  • ThreadPoolExecutor.DiscardPolicy: Discard new tasks without processing them
  • ThreadPoolExecutor.DiscardOldestPolicy: Discards the first submitted but unprocessed task


2. ThreadPoolExecutor example

Create a task of type Runnable

public class RunnableTask implements Runnable {
	// Identifies the task being run by the current thread
    int number;

    public ThreadTask(int number) {
        this.number = number;
    }

    @Override
    public void run(a) {
        try {
            System.out.println(Thread.currentThread().getName() + " Start -- " + number);
            Thread.sleep(10000);
            System.out.println(Thread.currentThread().getName() + " End -- " + number);
        } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code


Use the ThreadPoolExecutor thread pool to run tasks

public class ThreadPoolTest {
    /** The thread pool corresponds to the argument */
    private static final Integer corePoolSize = 4;
    private static final Integer maximumPoolSize = 8;
    private static final Long keepAliveTime = (long) 8;
    private static final TimeUnit timeUnit = TimeUnit.SECONDS;
    private static final BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(16);
    private static final RejectedExecutionHandler handle = new ThreadPoolExecutor.AbortPolicy();

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, handle);

        for (int i = 0; i < 4; i++) {
            RunnableTask runnableTask = new RunnableTask(i);
            threadPoolExecutor.execute(threadTask);
        }
        threadPoolExecutor.shutdown();
        while(! threadPoolExecutor.isTerminated()) { } System.out.println("Finished all threads"); }}Copy the code


Example creates a fixed size for the four thread pool, and then assign the five working, because the thread pool size to four, it will start the four line Mr. Cheng four processing work, other work is in a wait state, once the work is done, free worker threads will pick up the other work performed in the waiting queue. These four threads will continue to exist until the thread pool dies.

Here is the output of the program:

pool-1-thread-4 Start -- 3
pool-1-thread-1 Start -- 0
pool-1-thread-2 Start -- 1
pool-1-thread-3 Start -- 2
pool-1-thread-4 End -- 3
pool-1-thread-1 End -- 0
pool-1-thread-4 Start -- 4
pool-1-thread-2 End -- 1
pool-1-thread-3 End -- 2
pool-1-thread-4 End -- 4
Finished all threads
Copy the code


Not only can we specify the number of active threads when creating an instance of ThreadPoolExecutor, we can also limit the size of the thread pool and create our own RejectedExecutionHandler implementation to handle rejected tasks

Custom rejection handler:

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + "Rejected."); }}Copy the code


Change the thread pool parameter in ThreadPooltest.java slightly to reduce the blocking queue to 1. Since the maximum thread pool is 8, the blocking queue can only hold one task, and when 10 tasks are submitted, one of them will be rejected by the custom saturation policy

public class ThreadPoolTest {
    /** The thread pool corresponds to the argument */
    private static final Integer corePoolSize = 4;
    private static final Integer maximumPoolSize = 8;
    private static final Long keepAliveTime = (long) 8;
    private static final TimeUnit timeUnit = TimeUnit.SECONDS;
    /** reduces blocking queues */
    private static final BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1);
    /** set to your own implementation */
    private static final RejectedExecutionHandler handle = new RejectedExecutionHandlerImpl();

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue, handle);
		// Submit ten tasks
        for (int i = 0; i < 10; i++) {
            ThreadTask threadTask = new ThreadTask(i);
            threadPoolExecutor.execute(threadTask);
        }
        threadPoolExecutor.shutdown();
        while(! threadPoolExecutor.isTerminated()) { } System.out.println("Finished all threads"); }}Copy the code


Execution Result:

Cn. Testproject. Thread. Runnable. ThreadTask @ e2f02 was refused the pool - 4571-thread-1 Start -- 0
pool-1-thread-3 Start -- 2
pool-1-thread-5 Start -- 5
pool-1-thread-7 Start -- 7
pool-1-thread-4 Start -- 3
pool-1-thread-2 Start -- 1
pool-1-thread-8 Start -- 8
pool-1-thread-6 Start -- 6
pool-1-thread-4 End -- 3
pool-1-thread-3 End -- 2
pool-1-thread-1 End -- 0
pool-1-thread-7 End -- 7
pool-1-thread-8 End -- 8
pool-1-thread-5 End -- 5
pool-1-thread-4 Start -- 4
pool-1-thread-6 End -- 6
pool-1-thread-2 End -- 1
pool-1-thread-4 End -- 4
Finished all threads
Copy the code


3. Use the ThreadPoolExecutor constructor to create a thread pool is recommended

In the “Concurrent Processing” section of the Alibaba Java Development Manual, it’s clear that thread pools are not allowed to be created by Executors, but by ThreadPoolExecutor. In this way, students can be more clear about the running rules of the thread pool and avoid the risk of resource exhaustion

The number of threads allowed to be created in the thread pool returned by Executors is Integer.MAX_VALUE, which may create a large number of threads and result in OOM.


4. Thread pool Settings Set the appropriate size

Here’s a simple formula that works for a wide range of applications:

  • CPU intensive task (N+1) : This type of task consumes CPU resources. The number of threads can be set to N (number of CPU cores) +1. One more thread is used to prevent occasional page miss interrupts or pauses caused by other reasons. Once the task is paused, the CPU is idle, and in this case the extra thread can take full advantage of the idle CPU time.
  • I/ O-intensive task (2N) : This is a task in which the system spends most of its time processing I/O interactions, and the thread does not occupy CPU for the time it is processing I/O, so the CPU can be handed over to another thread. Therefore, in the application of I/O intensive tasks, we can configure more threads, the specific calculation method is 2N.

How to determine whether a CPU – intensive task or an IO – intensive task?

CPU intensive is simply a task that uses the computing power of the CPU such as sorting large amounts of data in memory. When it comes to network reading alone, file reading is IO intensive. The characteristic of this type of task is that the CPU computation time is very small compared to the time spent waiting for THE I/O operation to complete. Most of the time is spent waiting for the I/O operation to complete.


5. How are tasks closed in the thread pool?

The thread pool shuts down the task using the shutdown() and shutdownNow() methods

  • ShutdownNow () : The thread pool rejects newly submitted tasks and immediately closes the thread pool. Tasks in the thread pool are no longer executed

  • Shutdown () : The thread pool rejects newly submitted tasks and closes the thread pool while waiting for all tasks in the thread pool to complete