1 Introduction to thread pools
1.1 Why use thread pools
- Reduce system resource consumption by reusing existing threads to reduce the consumption caused by thread creation and destruction;
- Improve system response speed. When a task arrives, it can be executed immediately without waiting for the creation of a new thread by reusing the existing thread.
- It is convenient to control the number of concurrent threads, because unlimited creation of threads can lead to excessive memory usage
OOM
And will causecpu
Overswitching (cpu
There is a time cost to switching threads (the need to keep the current thread of execution alive and restore the thread of execution alive) - Provides more powerful function for delayed timing thread pool
1.2 Why do thread pools need queues
If the thread is created without limit, it may take up too much memory and generate OOM, and it may cause excessive CPU switching.
The cost of creating a thread pool is high or the thread pool needs to acquire the global lock mainlock, which affects the concurrency efficiency. The blocking queue can be well buffered
1.3 Why do thread pools use blocking queues instead of non-blocking queues
The blocking queue can ensure that when there is no task in the task queue, the thread that obtains the task is blocked, so that the thread enters the wait state and releases CPU resources. When there is a task in the queue, the corresponding thread is awakened to fetch the message from the queue for execution. So that threads do not occupy CPU resources all the time. (After executing the task, the thread loops to retrieve the task from the task queue again for execution. Code snippet: While (task! = null || (task = getTask()) ! = null) {}).
No blocking queue is also possible, but it is more difficult to implement, there is a good reason not to use
1.4 How do I Configure a Thread Pool
-
Cpu-intensive tasks should use as small a thread pool as possible, typically the number of CPU cores +1. Cpu-intensive tasks cause high CPU usage. If too many threads are enabled, excessive CPU switchover may occur
-
IO intensive tasks can use slightly larger thread pools, typically 2*CPU cores. IO intensive tasks have low CPU usage. Therefore, you can make full use of CPU time by allowing other threads to handle other tasks while waiting for I/OS
-
Hybrid tasks can be divided into IO – and CPU-intensive tasks, which can then be processed in separate thread pools. As long as the execution time of the two tasks is similar, it will be more efficient than serial execution because if there is a data-level difference in the execution time of the two tasks after partitioning, then it is meaningless to split. Since the first task has to wait for the later task, the final time still depends on the later task, plus the cost of task splitting and merging, which is not worth the cost
1.5 Execute () and submit() methods
execute()
Execute a task without returning a valuesubmit()
Submit a thread task with a return value
Submit (Callable
task) gets its return value through future.get() (block until the task completes). Generally, FutureTask+Callable together with Submit (Runnable Task, T result) can indirectly obtain the return value of the thread through the incoming carrier result. Submit (Runnable task) returns no value, even if the return value is null
The future.get () method blocks the thread fetching the result until it completes, wakes it up, and returns the result
1.6 Spring thread pools
Spring uses TaskExecutor to implement multithreading and concurrent programming, ThreadPoolTaskExecutor to implement a TaskExecutor based thread pool, and @enableAsync to enable asynasync. And declare an exception thread pool that Spring has implemented for asynchronous tasks by using the @async annotation in the desired asynchronous method:
SimpleAsyncTaskExecutor
: Not a real thread pool, this class does not reuse threads and creates a new thread each time it is called.SyncTaskExecutor
This class does not implement asynchronous calls, just a synchronous operation. Only applicable where multithreading is not requiredConcurrentTaskExecutor
:Executor
Is not recommended. ifThreadPoolTaskExecutor
This class should only be considered if the requirements are not metSimpleThreadPoolTaskExecutor
Is:The Quartz SimpleThreadPool
In the class. This class is required only if thread pools are used by both Quartz and non-QuartzThreadPoolTaskExecutor
: Most commonly used, recommended. The essence of that is rightjava.util.concurrent.ThreadPoolExecutor
The packing of the
1.7 Transaction handling mechanism in @async calls
Transactional annotation also uses @transactional annotation in the @async annotation method; There is no transaction management control when it invokes database operations because it is based on asynchronous processing.
So how do you add transaction management to these operations? You can place methods that require transaction-management operations inside asynchronous methods and add @Transactional on methods that are called internally
Example:
Method A
, using the@Async/@Transactional
To annotate, but does not generate transaction control purposes.Method B
, using the@Async
To mark,B
Call theC, D
.C/D
using@Transactional
Annotations are made to achieve the purpose of transaction control
Example 2
2.1 Thread pool configuration classes
package cn.jzh.thread; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @configuration @ComponentScan("cn.jzh.thread") @enableAsync public Class TaskExecutorConfig implements asynchronous operations AsyncConfigurer {/** * ThreadPoolTaskExecutor is configured with the getAsyncExecutor method to obtain a thread pool-based TaskExecutor ** @return */ @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); pool.setCorePoolSize(5); Pool.setmaxpoolsize (10); // The maximum number of threads pool.setQueuecapacity (25); Pool.initialize (); // The thread initializes the return pool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; }}Copy the code
Configuration method in class description: in the Spring ThreadPoolExecutor is using the JDK and contract awarding the Java. The util. Concurrent. ThreadPoolExecutor. Some of these values have the following meanings:
int corePoolSize:
A thread pool maintains a minimum number of threadsint maximumPoolSize:
The thread pool maintains a maximum number of threads, the maximum number of threads allowed in the thread pool, the number of threads in the thread pool does not exceed this value. If the queue is full and the current number of threads is less thanmaximumPoolSize
, a new thread is created to execute the task.long keepAliveTime:
The lifetime of idle threadsTimeUnit
unit:
Unit of time, now nanosecond, microsecond, millisecond, secondBlockingQueue workQueue:
Holds a queue of tasks waiting to be executedRejectedExecutionHandler
The rejection policy of the handler thread pool refers to the action taken when a task added to the thread pool is rejected.
When a task is added to a thread pool, it may be rejected due to one of the following reasons: First, the thread pool is abnormally closed. Second, the number of tasks exceeds the maximum limit of the thread pool.
Reject
There are four types of predefined policies:
ThreadPoolExecutor.AbortPolicy
Policy, which is the default policy, is thrown at runtime if a handler is rejectedRejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
Policy, the caller’s thread executes the task and discards it if the executor is closed.ThreadPoolExecutor.DiscardPolicy
Policy. Tasks that cannot be executed are discarded.ThreadPoolExecutor.DiscardOldestPolicy
If the executor is not closed, the task at the head of the work queue is deleted and the executor is retried (e.g
If it fails again, repeat the process.)
2.2 Asynchronous Methods
The @async annotation can be used on a method to indicate that the method is an asynchronous method, or it can be used on a class to indicate that all methods of this class are asynchronous methods and the asynchronous methods are automatically injected using ThreadPoolTaskExecutor as the TaskExecutor
package cn.jzh.thread; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.Future; @Service public class AsyncTaskService { /** * * @param i */ @Async public void executeAsync(Integer i) throws Exception{system.out.println (" Thread ID: "+ thread.currentThread ().getid () +") "+ thread.currentThread ().getName()+"; } @async public Future<String> executeAsyncPlus(Integer I) throws Exception {system.out.println (" Thread ID: "+ thread.currentThread ().getid ()+" + thread.currentThread ().getName()+ "; return new AsyncResult<>("success:"+i); }}Copy the code
2.3 Starting tests
package cn.jzh.thread; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.Future; Public class MainApp {public static void main(String[] args) throws Exception{system.out.println (" Main id: "+ thread.currentThread ().getid () +" ); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class); AsyncTaskService service = context.getBean(AsyncTaskService.class); for (int i = 0; i<10; i++){ service.executeAsync(i); Future<String> result = service.executeAsyncPlus(i); System.out.println(" the asynchronous program finishes, and the child thread returns something (blocking the current main thread)" + result.get()); } context.close(); System.out.println(" currentThread() : "+ thread.currentThread ().getid () +" ); }}Copy the code
Note:
-
If the main thread does not get the result of the child thread (future.get ()), then the main thread should not block at all. So, at this point, the main thread and the child thread are completely asynchronous. This capability can be made into something like MQ messaging middleware, where messages are sent asynchronously
-
If the returned data type is Future, it is an interface. The specific result type is AsyncResult, which is something to be aware of. Call the asynchronous method that returns the result, and use future.isdone () to determine whether the execution is complete
public void testAsyncAnnotationForMethodsWithReturnType()
throws InterruptedException, ExecutionException {
System.out.println(“Invoking an asynchronous method. ” + Thread.currentThread().getName());
Future future = asyncAnnotationExample.asyncMethodWithReturnType();While (true) {/// use the loop judgment, If (future.isdone ()) {system.out.println ("Result from asynchronous process - "+ future.get()); break; } System.out.println("Continue doing something else. "); Thread.sleep(1000); }Copy the code
}
This is done by constantly checking the state of the Future to see if the current asynchronous method has finished executing