preface

Thread pools are the most widely used concurrency framework in Java, and can be used by almost any program that needs to perform tasks asynchronously or concurrently. Proper use of thread pools provides several benefits:

(1) Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.

(2) Improve the response speed. When processing execution tasks, tasks can be executed immediately without waiting for a thread to be created.

(3) Improve the manageability of threads. Threads are scarce resources. If they are created without limit, they will consume system resources and reduce system stability. Thread pools can be used for unified allocation, tuning, and monitoring.

1. Implementation principle of thread pool

The thread pool process is shown in the figure above

The CTL field in the thread pool represents the current state in the thread pool. The primary pool control state CTL is of type AtomicInteger and wraps two conceptual fields: WorkerCount and runState. WorkerCount indicates the number of valid threads, and runState indicates whether they are running or closing. The CTL field is used to represent both concepts, and the first three bits of the CTL represent the thread pool state, where the workerCount limit is (2^29)-1 (approximately 500 million) threads, rather than (2^31)-1 (2 billion) threads. WorkerCount is the number of workers allowed to start and not allowed to stop. This value can be temporarily different from the actual number of active threads, for example, when ThreadFactory fails to create a thread when it is asked, and when the exiting thread is still counting until it terminates. The pool size visible to the user is reported as the current size of the working set. RunState provides major life cycle control, as shown in the following table:

The field name meaning
RUNNING Accept new tasks and handle queuing tasks
SHUTDOWN Does not accept new tasks, but handles queuing tasks
STOP Do not accept new tasks, do not process queued tasks, and interrupt ongoing tasks
TIDYING All tasks have terminated, workerCount is zero, and the thread that transitions to state TIDYING runs terminate()
TERMINATED The terminate() method completes

RunState changes over time, and threads waiting in awaitTermination() return when state is TERMINATED. The state transition is:

RUNNING -> SHUTDOWN may be implicit in Finalize () when SHUTDOWN () is called

(RUNNING or SHUTDOWN)-> STOP when shutdownNow() is called

SHUTDOWN -> TIDYING When both the queue and thread pool are empty

STOP -> TIDYING When the thread pool is empty

TIDYING -> TERMINATED when the terminate() method is TERMINATED

Developers who need to process thread pools when they become TIDYING can do so by overloading the terminated() function.

The thread pool ThreadPoolExecutor execution process is illustrated in the figure above. There are four scenarios for executing tasks submitted to the thread pool using execute() :

(1) The number of threads running in the thread pool is less than corePoolSize, and a new thread is created to execute the task.

(2) The number of running threads in the thread pool is not less than corePoolSize. Add the task to the BlockingQueue.

(3) If the task cannot be added to the blocking queue (the queue is full), create a new thread to process the task (here the global lock is required).

(4) when creating a new thread number of the currently running thread in the thread pool than maximumPoolSize, refused to task in the thread pool, call RejectedExecutionHandler. RejectedExecution () method.

Source code analysis:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  // If the number of threads is less than the basic number of threads, create a thread to execute
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  // If the number of threads is not less than the base number of threads, add the task to the queue
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null.false);
  }
  // If the queue is full, create a new thread to process it
  else if(! addWorker(command,false))
    // Execute the reject policy
    reject(command);
}	
Copy the code

When the thread pool creates a thread, it will encapsulate the thread as a Worker thread Worker. After the Worker completes the task, it will cycle to obtain the task in the work queue for execution.

2. Create and use thread pools

Creating a thread pool

Before creating a thread pool, you need to know the core parameters in creating a thread pool:

CorePoolSize: When a task is submitted to the thread pool, the thread pool creates a thread to execute the task, even if other free base threads are able to execute new tasks, until the number of tasks to execute is greater than the number of core threads.

RunnableTaskQueue (TaskQueue) : Used to hold the blocking queue of tasks waiting to be executed. Generally, the following options are selected:

ArrayBlockingQueue: Array-based bounded blocking queue that sorts elements according to FIFO.

LinkedBlockingQueue: A blocking queue based on a linked list that sorts elements according to FIFO.

SynchronousQueue: Synchronousblocking queue, also a queue that does not store elements. Each insert operation must wait until another thread calls the remove operation, otherwise the insert operation is blocked.

PriorityBlockingQueue: PriorityBlockingQueue, an infinite blocking queue with a priority.

MaximumPoolSize: The maximum number of threads allowed to be created by the thread pool. If the queue is full and the number of threads in the pool is less than the maximum number of threads, the thread pool will create a new thread to execute the task. This parameter is useless when unbounded queues are used.

RejectedExecutionHandler: When both the task queue and the thread pool are full, the thread pool is saturated and must be RejectedExecutionHandler to handle the newly submitted task. There are four built-in rejection policies in the JDK:

AbortPolicy: Directly throws an exception

CallerRunsPolicy: Executes the task using the caller’s thread

DiscardOldestPolicy: Discards the most recent task in the queue to execute the current task

DiscardPolicy: Discards cards without processing them

The RejectedExecutionHandler interface can be used to customize processing policies according to application scenarios.

public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy the code

KeepAliveTime: The length of time that a worker thread in a thread pool stays alive after it is idle.

TimeUnit: Optional DAYS, HOURS, MINUTES, MILLISECONDS, MICROSECONDS, and NANOSECONDS.

ThreadFactory: You can use a ThreadFactory to create threads with meaningful names.

Create a thread pool by following the following steps: Create a thread pool by following the Executors Factory class. Create a thread pool by following the following steps: Thread pools are not allowed to be created by Executors because of the risk of running out of resources, according to the Specification in Ali Java Development Manual.

Create using Executors Factory class

Create a single threaded thread pool

public static ExecutorService newSingleThreadExecutor(a) {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}
Copy the code

Create a thread pool with a fixed number of threads

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}
Copy the code

The above two methods of creating a thread pool use a linked list to block a queue to store tasks. In actual scenarios, a large number of requests may accumulate and result in OOM

Create a cacheable thread pool

public static ExecutorService newCachedThreadPool(a) {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}
Copy the code

The maximum number of threads allowed to be created is integer. MAX_VALUE. If a large number of threads are created, the CPU will be overloaded and OOM will occur

Create a custom thread pool
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
}
Copy the code

Submit tasks to the thread pool

You can submit tasks to a thread pool using two methods, execute() and submit().

The execute() method is used to submit tasks that do not require a return value, so there is no way to determine whether the task was successfully executed by the thread pool. The execute() method passes in an instance of the Runnable class.

public static void main(String [] args){... threadPool.execute(new Runnable{
  public void run(a){
    //do something...}}); . }Copy the code

The submit() method is used to submit tasks that require a return value. The thread pool returns an object of type Future, which can be used to determine whether the task was successfully executed, and the return value can be retrieved from the Future’s get() method. The get() method blocks the current thread until the task is complete. Using the GET (long timeout, TimeUnit Unit) method blocks the current thread for a period of time and then returns immediately. In this case, the task may not be complete.

public static void main(String [] args){... Future<Object> future = threadPool.submit(handleTask);try{
  	Objects res = future.get();
  }catch(InterruptedException e){
  	// Handle the interrupt exception
  }catch(ExecutionException e){
  	// Handle the exception that cannot be executed
  }finally{ threadPool.shutdown(); }... }Copy the code

Closing the thread pool

A thread pool can be shutdown by calling its shutdown() or shutdownNow() methods. They work by iterating through worker threads in a thread pool and then interrupting them one by one by calling the interrupt() method, so a task that cannot respond to an interrupt may never be terminated.

public void shutdown(a) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(SHUTDOWN);
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
}

public List<Runnable> shutdownNow(a) {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(STOP);
    interruptWorkers();
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}
Copy the code

The difference between the shutdown() and shutdownNow() methods is that the shutdownNow method first sets the state of the thread pool to STOP, then tries to STOP the thread that is executing or suspending the task and returns the list of tasks waiting to be executed, Shutdown simply sets the state of the thread pool to shutdown and interrupts all threads that are not executing.

//shutdownNow()
private void interruptWorkers(a) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers)
      w.interruptIfStarted();
  } finally{ mainLock.unlock(); }}...void interruptIfStarted(a) {
  Thread t;
  if (getState() >= 0&& (t = thread) ! =null && !t.isInterrupted()) {
    try {
      t.interrupt();
    } catch (SecurityException ignore) {
    }
  }
}
Copy the code
//shutdown()
private void interruptIdleWorkers(a) {
  interruptIdleWorkers(false); }...private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if(! t.isInterrupted() && w.tryLock()) {try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally{ w.unlock(); }}if (onlyOne)
        break; }}finally{ mainLock.unlock(); }}Copy the code

3. Setting thread pool parameters is recommended

The core problem with thread pool usage is that thread pool parameters are not easy to configure. On the one hand, the operation mechanism of thread pool is not well understood, and rational configuration depends on the personal experience and knowledge of developers. On the other hand, thread pool execution is highly dependent on the type of task, and IO intensive and CPU intensive tasks perform very differently, resulting in no mature empirical strategies for developers to refer to.

(1) Simple assessment based on task-based reference:

Assume the thread pool size setting (N is the number of cpus)

For pure computing tasks, multi-threading does not improve performance, because CPU processing capacity is a scarce resource. On the contrary, it costs a lot of thread switching. In this case, it is recommended to set the number of threads to CPU or +1. —- why +1? Because you can prevent one thread out of N from accidentally stopping or exiting, the CPU does not wait idle.

For IO intensive applications, set the size of the thread pool to 2N+1. Number of threads = Number of CPU cores x target CPU usage x (1 + Average waiting time/Average working time)

(2) Ideal state assessment based on the number of tasks:

1) Default values

corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
Copy the code

– Tasks: indicates the number of tasks per second (suppose 500 to 1000). – taskCost: indicates the time taken by each task. Suppose 0.1s

  • Calculations to obtain
    • CorePoolSize = How many threads per second are required to process?

      • threadcount = tasks / ( 1 / taskCost ) =tasks*taskcout = (500X0.1 = 50, 1000)100 threads. The corePoolSize setting should be greater than 50
      • According to the 8020 rule, if 80% of tasks per second are less than 800, then corePoolSize is set to 80
    • queueCapacity = ( coreSizePool / taskCost ) * responsetime

      • QueueCapacity = 80/0.1*1 = 800. This means that the thread in the queue can wait for 1s, and if it exceeds that, it needs to open a new thread to execute
      • Remember not to set it to integer.max_value because the queue will be large and the number of threads will only remain at corePoolSize. When a task increases rapidly, no new threads can be started and the response time will increase rapidly.
    • maxPoolSize = (max(tasks) – queueCapacity) / ( 1 / taskCost)

      • MaxPoolSize = (1000-800)/10 = 20 (+50)
      • (Maximum number of tasks – queue capacity)/capacity per thread per second = maximum number of threads
    • RejectedExecutionHandler: The task can be discarded if it is not important. If it is important, use some buffer mechanism to handle it

    • KeepAliveTime and allowCoreThreadTimeout are usually satisfied by default

All of the above are ideal values, but the actual situation depends on the machine performance. If the machine CPU load is full before the maximum number of threads is reached, you need to upgrade the hardware and optimize the code to reduce taskCost.

(This is only a simple ideal state assessment and can be used as a reference for thread pool parameter Settings)

4. Thread pool usage scenarios are recommended

The scene of a

Slave businesses that do not have a direct data dependency on the master business can be handled using asynchronous thread pools, which are created at project initialization and submitted to the asynchronous thread pool for execution to reduce response time.

/** * create a thread pool using asynchronous annotations called */
@Bean
public ThreadPoolTaskExecutor asyncExecutorPool(a) {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  /** Set the number of core threads */
  executor.setCorePoolSize(20);
  /** Sets the maximum number of threads */
  executor.setMaxPoolSize(100);
  /** Set the queue size */
  executor.setQueueCapacity(500);
  /** Wait for tasks to complete while shutdown -- waiting for all threads to complete */
  executor.setWaitForTasksToCompleteOnShutdown(true);
  /** Wait time (default is 0, stop immediately), does not wait xx seconds to forcibly stop */
  executor.setAwaitTerminationSeconds(60);
  /** Configures the thread name prefix */ in the thread pool
  executor.setThreadNamePrefix("test-async-thread-");
  Rejection -policy: how to process a new task when the pool has reached Max size (CALLER_RUNS: not in a new thread, but in the caller's thread)*/
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  /** Initializes the actuator */
  executor.initialize();

  return executor;
}

// Use @async Async annotations
@Async("asyncExecutorPool")
public void processTask1(a) {
  //doSomething
}
Copy the code

Do not thread in business code!!

/** * create thread pool directly with */
public class ThreadPoolExecutorTest{
  
  private ThreadPoolExecutor executor;
  
  @PostConstruct
  public void init(a) {
    /** Thread pool initialization */
    ThreadPoolExecutor threadPoolExecutor = 
      new ThreadPoolExecutor(30.60.60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
    threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    ThreadFactory threadFactory = new CustomizableThreadFactory("test-creates-thread-");
    threadPoolExecutor.setThreadFactory(threadFactory);
    this.executor = threadPoolExecutor; }...public void processTask(a){
    Future<> future = executor.submit(
    //doSomething...); }}Copy the code

Scenario 2

When tasks need to be executed in a specified order (FIFO, LIFO, priority), it is recommended to create a thread pool that uses single threading.

public class SingleExecutorTest{
  
  private HashMap<Long,ThreadPoolExecutor> executorMap = newHashMap<>(); .public void init(a) {
  /** Thread pool initialization */
    for (int i = 0; i < 5; i++) {
      /** Task queue capacity: 1000*/
      ThreadPoolExecutor threadPoolExecutor = 
        new ThreadPoolExecutor(1.1.0, 
                               TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
      /** Reject policy: silently discard without throwing exceptions */
      threadPoolExecutor.setRejectedExecutionHandler(
        new ThreadPoolExecutor.DiscardPolicy());
      ThreadFactory threadFactory = new CustomizableThreadFactory("testSingle-"+ i +"-"); threadPoolExecutor.setThreadFactory(threadFactory); executorMap.put(Long.valueOf(i),threadPoolExecutor); }}.../** Tasks that need to be executed sequentially */ 
  public void processTask(a){.../** get a single thread pool */
    ThreadPoolExecutor executor = executorMap.get(Long.valueOf(id % 5));
    /** Submit the task to the thread pool */
    excutor.submit(
      //doSomething...); }}Copy the code

conclusion

This article mainly explains the execution principle and creation method of thread pool and recommends thread pool parameter Settings and common usage scenarios. In the development, developers need to create and use thread pools reasonably according to business to reduce resource consumption and improve response speed.

Reference:

Ali Java Development Manual (Taishan Edition)

The Art of Concurrent Programming in Java

Recommended reading

Java Basics – Lock core

Process and optimization of a search performance

Kubernetes Scheduler source code parsing and custom resource scheduling algorithm practice

Introduction to the JVM (Take you to the JVM from a different perspective)

, recruiting

Zhengcaiyun Technology team (Zero) is a passionate, creative and executive team based in picturesque Hangzhou. The team has more than 300 r&d partners, including “old” soldiers from Alibaba, Huawei and NetEase, as well as newcomers from Zhejiang University, University of Science and Technology of China, Hangzhou Electric And other universities. Team in the day-to-day business development, but also in cloud native, chain blocks, artificial intelligence, low code platform system, middleware, data, material, engineering platform, the performance experience, visualization technology areas such as exploration and practice, to promote and fell to the ground a series of internal technical products, continue to explore new frontiers of technology. In addition, the team is involved in community building, Currently, There are Google Flutter, SciKit-Learn, Apache Dubbo, Apache Rocketmq, Apache Pulsar, CNCF Dapr, Apache DolphinScheduler, and Alibaba Seata and many other contributors to the excellent open source community. If you want to change something that’s been bothering you, want to start bothering you. If you want to change, you’ve been told you need more ideas, but you don’t have a solution. If you want change, you have the power to make it happen, but you don’t need it. If you want to change what you want to accomplish, you need a team to support you, but you don’t have the position to lead people. If you want to change the original savvy is good, but there is always a layer of fuzzy window…… If you believe in the power of believing, believing that ordinary people can achieve extraordinary things, believing that you can meet a better version of yourself. If you want to be a part of the process of growing a technology team with deep business understanding, sound technology systems, technology value creation, and impact spillover as your business takes off, I think we should talk. Any time, waiting for you to write something and send it to [email protected]

Wechat official account

The article is published synchronously, the public number of political cloud technology team, welcome to pay attention to