1 Introduction to thread pools

1.1 Why use thread pools

  • Reduce system resource consumption by reusing existing threads to reduce the consumption caused by thread creation and destruction;
  • Improve system response speed. When a task arrives, it can be executed immediately without waiting for the creation of a new thread by reusing the existing thread.
  • It is convenient to control the number of concurrent threads, because unlimited creation of threads can lead to excessive memory usageOOMAnd will causecpuOverswitching (cpuThere is a time cost to switching threads (the need to keep the current thread of execution alive and restore the thread of execution alive)
  • Provides more powerful function for delayed timing thread pool

1.2 Why do thread pools need queues

If the thread is created without limit, it may take up too much memory and generate OOM, and it may cause excessive CPU switching.

The cost of creating a thread pool is high or the thread pool needs to acquire the global lock mainlock, which affects the concurrency efficiency. The blocking queue can be well buffered

1.3 Why do thread pools use blocking queues instead of non-blocking queues

The blocking queue can ensure that when there is no task in the task queue, the thread that obtains the task is blocked, so that the thread enters the wait state and releases CPU resources. When there is a task in the queue, the corresponding thread is awakened to fetch the message from the queue for execution. So that threads do not occupy CPU resources all the time. (After executing the task, the thread loops to retrieve the task from the task queue again for execution. Code snippet: While (task! = null || (task = getTask()) ! = null) {}).

No blocking queue is also possible, but it is more difficult to implement, there is a good reason not to use

1.4 How do I Configure a Thread Pool

  • Cpu-intensive tasks should use as small a thread pool as possible, typically the number of CPU cores +1. Cpu-intensive tasks cause high CPU usage. If too many threads are enabled, excessive CPU switchover may occur

  • IO intensive tasks can use slightly larger thread pools, typically 2*CPU cores. IO intensive tasks have low CPU usage. Therefore, you can make full use of CPU time by allowing other threads to handle other tasks while waiting for I/OS

  • Hybrid tasks can be divided into IO – and CPU-intensive tasks, which can then be processed in separate thread pools. As long as the execution time of the two tasks is similar, it will be more efficient than serial execution because if there is a data-level difference in the execution time of the two tasks after partitioning, then it is meaningless to split. Since the first task has to wait for the later task, the final time still depends on the later task, plus the cost of task splitting and merging, which is not worth the cost

1.5 Execute () and submit() methods

  1. execute()Execute a task without returning a value
  2. submit()Submit a thread task with a return value

Submit (Callable

task) gets its return value through future.get() (block until the task completes). Generally, FutureTask+Callable together with Submit (Runnable Task, T result) can indirectly obtain the return value of the thread through the incoming carrier result. Submit (Runnable task) returns no value, even if the return value is null

The future.get () method blocks the thread fetching the result until it completes, wakes it up, and returns the result

1.6 Spring thread pools

Spring uses TaskExecutor to implement multithreading and concurrent programming, ThreadPoolTaskExecutor to implement a TaskExecutor based thread pool, and @enableAsync to enable asynasync. And declare an exception thread pool that Spring has implemented for asynchronous tasks by using the @async annotation in the desired asynchronous method:

  • SimpleAsyncTaskExecutor: Not a real thread pool, this class does not reuse threads and creates a new thread each time it is called.
  • SyncTaskExecutorThis class does not implement asynchronous calls, just a synchronous operation. Only applicable where multithreading is not required
  • ConcurrentTaskExecutor:ExecutorIs not recommended. ifThreadPoolTaskExecutorThis class should only be considered if the requirements are not met
  • SimpleThreadPoolTaskExecutorIs:The Quartz SimpleThreadPoolIn the class. This class is required only if thread pools are used by both Quartz and non-Quartz
  • ThreadPoolTaskExecutor: Most commonly used, recommended. The essence of that is rightjava.util.concurrent.ThreadPoolExecutorThe packing of the

1.7 Transaction handling mechanism in @async calls

Transactional annotation also uses @transactional annotation in the @async annotation method; There is no transaction management control when it invokes database operations because it is based on asynchronous processing.

So how do you add transaction management to these operations? You can place methods that require transaction-management operations inside asynchronous methods and add @Transactional on methods that are called internally

Example:

  • Method A, using the@Async/@TransactionalTo annotate, but does not generate transaction control purposes.
  • Method B, using the@AsyncTo mark,BCall theC, D.C/Dusing@TransactionalAnnotations are made to achieve the purpose of transaction control

Example 2

2.1 Thread pool configuration classes

package cn.jzh.thread; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @configuration @ComponentScan("cn.jzh.thread") @enableAsync public Class TaskExecutorConfig implements asynchronous operations AsyncConfigurer {/** * ThreadPoolTaskExecutor is configured with the getAsyncExecutor method to obtain a thread pool-based TaskExecutor ** @return */ @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); pool.setCorePoolSize(5); Pool.setmaxpoolsize (10); // The maximum number of threads pool.setQueuecapacity (25); Pool.initialize (); // The thread initializes the return pool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; }}Copy the code

Configuration method in class description: in the Spring ThreadPoolExecutor is using the JDK and contract awarding the Java. The util. Concurrent. ThreadPoolExecutor. Some of these values have the following meanings:

  • int corePoolSize:A thread pool maintains a minimum number of threads
  • int maximumPoolSize:The thread pool maintains a maximum number of threads, the maximum number of threads allowed in the thread pool, the number of threads in the thread pool does not exceed this value. If the queue is full and the current number of threads is less thanmaximumPoolSize, a new thread is created to execute the task.
  • long keepAliveTime:The lifetime of idle threadsTimeUnit
  • unit:Unit of time, now nanosecond, microsecond, millisecond, second
  • BlockingQueue workQueue:Holds a queue of tasks waiting to be executed
  • RejectedExecutionHandlerThe rejection policy of the handler thread pool refers to the action taken when a task added to the thread pool is rejected.

    When a task is added to a thread pool, it may be rejected due to one of the following reasons: First, the thread pool is abnormally closed. Second, the number of tasks exceeds the maximum limit of the thread pool.

    RejectThere are four types of predefined policies:
  1. ThreadPoolExecutor.AbortPolicyPolicy, which is the default policy, is thrown at runtime if a handler is rejectedRejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicyPolicy, the caller’s thread executes the task and discards it if the executor is closed.
  3. ThreadPoolExecutor.DiscardPolicyPolicy. Tasks that cannot be executed are discarded.
  4. ThreadPoolExecutor.DiscardOldestPolicyIf the executor is not closed, the task at the head of the work queue is deleted and the executor is retried (e.g

    If it fails again, repeat the process.)

2.2 Asynchronous Methods

The @async annotation can be used on a method to indicate that the method is an asynchronous method, or it can be used on a class to indicate that all methods of this class are asynchronous methods and the asynchronous methods are automatically injected using ThreadPoolTaskExecutor as the TaskExecutor

package cn.jzh.thread; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.Future; @Service public class AsyncTaskService { /** * * @param i */ @Async public void executeAsync(Integer i) throws Exception{system.out.println (" Thread ID: "+ thread.currentThread ().getid () +") "+ thread.currentThread ().getName()+"; } @async public Future<String> executeAsyncPlus(Integer I) throws Exception {system.out.println (" Thread ID: "+ thread.currentThread ().getid ()+" + thread.currentThread ().getName()+ "; return new AsyncResult<>("success:"+i); }}Copy the code

2.3 Starting tests

package cn.jzh.thread; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.Future; Public class MainApp {public static void main(String[] args) throws Exception{system.out.println (" Main id: "+ thread.currentThread ().getid () +" ); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class); AsyncTaskService service = context.getBean(AsyncTaskService.class); for (int i = 0; i<10; i++){ service.executeAsync(i); Future<String> result = service.executeAsyncPlus(i); System.out.println(" the asynchronous program finishes, and the child thread returns something (blocking the current main thread)" + result.get()); } context.close(); System.out.println(" currentThread() : "+ thread.currentThread ().getid () +" ); }}Copy the code

Note:

  1. If the main thread does not get the result of the child thread (future.get ()), then the main thread should not block at all. So, at this point, the main thread and the child thread are completely asynchronous. This capability can be made into something like MQ messaging middleware, where messages are sent asynchronously

  2. If the returned data type is Future, it is an interface. The specific result type is AsyncResult, which is something to be aware of. Call the asynchronous method that returns the result, and use future.isdone () to determine whether the execution is complete

    public void testAsyncAnnotationForMethodsWithReturnType()

    throws InterruptedException, ExecutionException {

    System.out.println(“Invoking an asynchronous method. ” + Thread.currentThread().getName());

    Future future = asyncAnnotationExample.asyncMethodWithReturnType();

    While (true) {/// use the loop judgment, If (future.isdone ()) {system.out.println ("Result from asynchronous process - "+ future.get()); break; } System.out.println("Continue doing something else. "); Thread.sleep(1000); }Copy the code

    }

This is done by constantly checking the state of the Future to see if the current asynchronous method has finished executing