Spring asynchronous calls, multithreading
- An overview of the
- Quick start
- An asynchronous callback
- Asynchronous exception handling
- Custom actuators
1, an overview of the
In daily development, our logic is called synchronously and executed sequentially. But in some cases we want to call asynchronously, separating the main thread from part of the logic, for faster program execution and improved performance. For example, high concurrency interfaces, user operation logs, etc.
Asynchronous invocation, corresponding to synchronous invocation.
- Synchronous invocation: programs are executed in a defined order. Each line of programs must wait until the previous line of programs finishes executing.
- Asynchronous call: the sequential execution of programs without waiting for the asynchronous call to return the execution result.
For asynchronous reliability, we usually consider introducing message queues, such as RabbitMQ, RocketMQ, Kafka, etc. But in some cases, we don’t need such high reliability and can use in-process queues or thread pools.
public static void main(String[] args) {
// Create a thread pool. This is just a temporary test, development specification...
ExecutorService executor = Executors.newFixedThreadPool(10);
// Submit the task to the thread pool for execution.
executor.submit(new Runnable() {
@Override
public void run(a) {
System.out.println("I heard I was called asynchronously."); }}); }Copy the code
Queues or thread pools within processes are less reliable because the tasks in queues and thread pools are only stored in memory, and if a JVM process is abnormally shut down, it will be lost and not executed.
In distributed message queues, asynchronous calls are stored as a message on the message server, so even if the JVM process is abnormally interrupted, the message remains on the server of the message service queue
So when using in-process queues or thread pools for asynchronous calls, it is important to make sure that JVM processes are shut down gracefully and that they are executed before they are shut down.
The Spring Task module of the Spring Framework provides the @async annotation, which can be added to a method to automatically implement asynchronous calls to the method
In simple terms, we can use declarative Async with @Transactional Transactional transactions, using the @async annotation provided by SpringTask. In the implementation principle, it is also based on Spring AOP interception to achieve asynchronous submission of the operation to the thread pool, to achieve the purpose of asynchronous invocation.
2. Quick start
2.1 Importing Dependencies
<?xml version="1.0" encoding="UTF-8"? >
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1. RELEASE</version>
<relativePath/> <! -- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>lab-29-async-demo</artifactId>
<dependencies>
<! Spring Boot dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<! Write unit tests later -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Copy the code
Since Spring Task is a module of the Spring Framework, we don’t need to specifically introduce it after we introduce the Spring-boot-Web dependency.
2.2 Application
Create the Application class and add @EnableAsync to enable @Async support
@SpringBootApplication
@EnableAsync // Enable @async support
public class Application {
public static void main(String[] args) { SpringApplication.run(Application.class, args); }}Copy the code
- Add to class
@EnableAsync
Annotations to enable asynchrony.
2.3 DemoService
package cn.iocoder.springboot.lab29.asynctask.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Service
public class DemoService {
private Logger logger = LoggerFactory.getLogger(getClass());
public Integer execute01(a) {
logger.info("[execute01]");
sleep(10);
return 1;
}
public Integer execute02(a) {
logger.info("[execute02]");
sleep(5);
return 2;
}
private static void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
throw newRuntimeException(e); }}@Async
public Integer zhaoDaoNvPengYou(Integer a, Integer b) {
throw new RuntimeException("Programmers don't need girlfriends."); }}Copy the code
-
Define execute01 and execute02 methods to simulate sleep for 10 seconds and 5 seconds, respectively.
-
At the same time, in the method, use logger to print the log, so that we can see the execution time of each method, and the execution thread
2.4 Synchronous Invocation test
Write the DemoServiceTest test class, add the # Task01 () method, and call the above method synchronously as follows:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private DemoService demoService;
@Test
public void task01(a) {
long now = System.currentTimeMillis();
logger.info("[task01][Start execution]");
demoService.execute01();
demoService.execute02();
logger.info("[task01][end execution, elapsed time {} ms]", System.currentTimeMillis() - now); }}Copy the code
Run the unit test and print the following log:
2020-06-02 09:16:03.391 INFO 3108. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task01] [started]2020-06-02 09:16:03.402 INFO 3108 --- [ main] c.i.s.l.asynctask.service.DemoService : [execute01]
2020-06-02 09:16:13.403 INFO 3108 --- [ main] c.i.s.l.asynctask.service.DemoService : [execute02]
2020-06-02 09:16:18.403 INFO 3108. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task01] [end of execution, and the consumption time15012Ms]Copy the code
- Both methods are executed sequentially and take 15 seconds to execute.
- Both execute on the main thread.
2.5 Asynchronous Invocation Test
Modify DemoServiceTest to add execute01Async() and execute02Async() async call methods:
@Async
public Integer execute01Async(a) {
return this.execute01();
}
@Async
public Integer execute02Async(a) {
return this.execute02();
}
Copy the code
- in
execute01Async()
和execute01Async()
, add@Async
Implementing asynchronous calls
Modify the DemoServiceTest class and write the # Task02 () method to call the above two methods asynchronously.
@Test
public void task02(a) {
long now = System.currentTimeMillis();
logger.info("[task02][Start execution]");
demoService.execute01Async();
demoService.execute02Async();
logger.info("[task02][End execution, elapsed time {} ms]", System.currentTimeMillis() - now);
}
Copy the code
Print logs:
The 2020-06-02 10:57:41. 14416-643 the INFO [main] C.I.S.L.A.S ervice. DemoServiceTest: [task02] [started] 10:57:41 2020-06-02. 14416-675 the INFO [main] O.S.S.C oncurrent. ThreadPoolTaskExecutor: Initializing ExecutorService'applicationTaskExecutor'The 2020-06-02 10:57:41. 14416-682 the INFO [main] C.I.S.L.A.S ervice. DemoServiceTest: [task02] [end of execution, and the consumption time 39 milliseconds]Copy the code
- DemoService’s two methods execute asynchronously, so the main thread only takes about 39 milliseconds. Notice that the actual execution of these two methods is not complete.
- Both methods of DemoService are executed in an asynchronous thread pool.
2.6 Waiting for an asynchronous call to complete the test
In the above ** asynchronous call, the two methods are only asynchronous calls and the method is not finished. In some business scenarios, we have an asynchronous call and the main thread returns a result, so we need the main thread to block and wait for the result of the asynchronous call.
Modify DemoService to add execute01AsyncWithFuture() and execute01AsyncWithFuture() asynchronous calls and return a Future object. Code:
@Async
public Future<Integer> execute01AsyncWithFuture(a) {
return AsyncResult.forValue(this.execute01());
}
@Async
public Future<Integer> execute02AsyncWithFuture(a) {
return AsyncResult.forValue(this.execute02());
}
Copy the code
- In the two asynchronous methods here, add
AsyncResult.forValue(this.execute02());
, returns with the result of executionThe Future object
Modify the DemoServiceTest class by writing a # Task02 () method that calls the above two methods asynchronously and blocks the thread for the result of the asynchronous call
Code:
@Test
public void task03(a) throws ExecutionException, InterruptedException {
long now = System.currentTimeMillis();
logger.info("[task03][Start execution]");
// Execute the task
Future<Integer> execute01Result = demoService.execute01AsyncWithFuture();
Future<Integer> execute02Result = demoService.execute02AsyncWithFuture();
// block to wait for results
execute01Result.get();
execute02Result.get();
logger.info("[task03][End execution, elapsed time {} ms]", System.currentTimeMillis() - now);
}
Copy the code
- Call both methods asynchronously and return the corresponding Future object. The asynchronous invocation logic of these two can be executed in parallel.
- The Future object of
get()
Method, effect: blocks the thread to wait for the result to return.
Print logs:
2020-06-02 13:56:43.955 INFO 7828. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task03] [started]2020-06-02 13:56:43.987 INFO 7828 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-06-02 13:56:44.008 INFO 7828 --- [ task-1] c.i.s.l.asynctask.service.DemoService : [execute01]
2020-06-02 13:56:44.008 INFO 7828 --- [ task-2] c.i.s.l.asynctask.service.DemoService : [execute02]
2020-06-02 13:56:54.008 INFO 7828. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task03] [end of execution, and the consumption time10053Ms]Copy the code
- Two asynchronous invocation methods, one by thread pool
task-1
andtask-2
Simultaneous execution. Because the main thread blocks and waits for the execution result, the execution time is 10 seconds. When there are multiple asynchronous calls at the same time, the thread blocks and waits, and the execution time is determined by the asynchronous call logic that consumes the longest.
2.7 Applying the Configuration File
In application, add the Spring Task configuration
spring:
task:
The TaskExecutionProperties configuration class. This executor is used for Spring asynchronous tasks.
execution:
thread-name-prefix: task- The prefix of the thread name for the thread pool. The default value is task-. You are advised to set this parameter based on your application
pool: Thread pool correlation
core-size: 8 Number of core threads, the number of threads initialized when the thread pool is created. The default value is 8.
max-size: 20 # maximum number of threads, the maximum number of threads in the pool. Only after the buffer queue is full will more threads than the number of core threads be applied. The default is an Integer. MAX_VALUE
keep-alive: 60s The idle time allowed for threads other than the core thread is destroyed when the idle time is reached. The default value is 60 seconds
queue-capacity: 200 Buffer queue size, the size of the queue used to buffer tasks. The default value is integer.max_value.
allow-core-thread-timeout: true Whether to allow core thread timeout, that is, enable the thread pool to grow and shrink dynamically. The default is true.
shutdown:
await-termination: true Whether to wait for scheduled tasks to complete when the application is closed. The default value is false. You are advised to set it to true
await-termination-period: 60 # Maximum time to wait for a task to complete, in seconds. The default value is 0, depending on your application
Copy the code
- Spring itself relies on Spring Task
- in
spring.task.execution
Configuration item, Spring Task scheduling Task configuration, correspondingTaskExecutionProperties
The configuration class - Spring Boot TaskExecutionAutoConfiguration automation, configuration, and realized the Spring automatic configuration, the Task is created
ThreadPoolTaskExecutor
Thread pool-based task executor, in factThreadPoolTaskExecutor
isThreadPoolExecutor
The sub-assembly, the main increase in the execution of the task, and returnListenableFutureObject functionality.
The reliability of asynchrony mentioned earlier, shut down the process gracefully. Spring, task execution. Shutdown configuration closed, in order to realize the spring task elegant shut down. During the execution of an asynchronous task, if the application is shut down, the beans used by the asynchronous task will be destroyed. For example, if the asynchronous task needs to access the database connection pool while the asynchronous task is still in execution, an error will be reported if there is no corresponding Bean when the asynchronous task needs to access the database.
-
After await-termination: true is configured, asynchronous tasks are waiting for completion when the application is closed. When the application shuts down, Spring waits for ThreadPoolTaskExecutor to complete its task before destroying the Bean.
-
In some business scenarios it is not possible to keep Spring waiting for asynchronous tasks to complete when the application is shut down. By setting await-abortion-period to 60, you can set the maximum waiting time of Spring. When the time reaches, asynchronous tasks will not wait for completion.
3. Asynchronous callback
In a service scenario, a callback may be required after an asynchronous task is executed. The following describes how to implement custom callbacks after asynchronous execution is complete.
3.1 AsyncResult source code interpretation
The AsyncResult class we saw in 2.6 Waiting for an Asynchronous call to complete represents asynchronous results. The returned results are divided into two cases:
-
On success, the AsyncResult#forValue(V value) static method is called and returns a successful ListenableFuture object,
Source:
/** * Create a new async result which exposes the given value from {@link Future#get()}. * @param value the value to expose * @since 4.2 * @see Future#get() */ public static <V> ListenableFuture<V> forValue(V value) { return new AsyncResult<>(value, null); } Copy the code
-
When an exception is executed, the AsyncResult#forExecutionException(Throwable ex) static method is called and the ListenableFuture object of the exception is returned. Source:
/** * Create a new async result which exposes the given exception as an * {@link ExecutionException} from {@link Future#get()}. * @param ex the exception to expose (either an pre-built {@link ExecutionException} * or a cause to be wrapped in an {@link ExecutionException}) * @since 4.2 * @see ExecutionException */ public static <V> ListenableFuture<V> forExecutionException(Throwable ex) { return new AsyncResult<>(null, ex); } Copy the code
AsyncResult also implements the ListenableFuture interface, which provides callback processing for asynchronous execution results.
public class AsyncResult<V> implements ListenableFuture<V>
Copy the code
ListenableFuture interface, source:
public interface ListenableFuture<T> extends Future<T> {
// Add callback methods to handle success and exception cases uniformly.
void addCallback(ListenableFutureCallback<? super T> callback);
// Add success and failure callback methods to handle success and exception cases, respectively.
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
// Convert ListenableFuture to CompletableFuture provided by JDK8.
// Then we can use ListenableFuture to set the callback
default CompletableFuture<T> completable(a) {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
addCallback(completable::complete, completable::completeExceptionally);
returncompletable; }}Copy the code
ListenableFuture inherits the Future, so AsyncResult also implements the Future interface.
public interface Future<V> {
// If the task has not yet started, cancel(...) Method will return false;
// If the task has been started, executing the cancel(true) method attempts to stop the task by interrupting the thread executing the task, and returns true on success;
// When the task has been started, executing the cancel(false) method will not affect the thread executing the task (let the thread execute normally until completion), and returns false;
// When the task is complete, cancel(...) Method will return false.
The mayInterruptRunning parameter indicates whether to interrupt the executing thread.
boolean cancel(boolean mayInterruptIfRunning);
// If the task is cancelled before completion, true is returned.
boolean isCancelled(a);
// Return true if the task is completed, whether it is completed normally, cancelled, or if an exception occurs.
boolean isDone(a);
// Gets the result of the asynchronous execution. If no result is available, this method blocks until the asynchronous calculation is complete.
V get(a) throws InterruptedException, ExecutionException;
If no result is available, this method blocks, but with a time limit, and throws an exception if the block lasts longer than a timeout.
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
In the AsyncResult addCallback (…). Method callback implementation, source code:
@Override
public void addCallback(ListenableFutureCallback<? super V> callback) {
addCallback(callback, callback);
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException ! =null) { / / "1"
failureCallback.onFailure(exposedException(this.executionException));
}
else { / / "2"
successCallback.onSuccess(this.value); }}catch (Throwable ex) { / / "3"
// Ignore}}// Get the original exception from ExecutionException.
private static Throwable exposedException(Throwable original) {
if (original instanceof ExecutionException) {
Throwable cause = original.getCause();
if(cause ! =null) {
returncause; }}return original;
}
Copy the code
- From ListenableFutureCallback, the ListenableFutureCallback interface inherits both SuccessCallback and FailureCallback interfaces
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback
Copy the code
- 1. If the fault is handled abnormally, the failureCallback callback is invoked
- 2. Call the successCallback if the processing result is successful
- 3. If the callback logic is abnormal, it is ignored directly. Assume that multiple callbacks, one of which appears on the agenda, will not affect the other callbacks.
In fact AsyncResult is executed as a result of asynchrony. Since it is the result, the execution is complete. So, before we call #addCallback(…) Interface methods to add a callback must directly use the callback to process the result of execution.
All methods defined by AsyncResult to the Future are implemented as follows:
// AsyncResult.java
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false; // AsyncResult is the result of execution, so return false to indicate cancellation failure.
}
@Override
public boolean isCancelled(a) {
return false; // AsyncResult is the result of execution, so return false to indicate that it is not cancelled.
}
@Override
public boolean isDone(a) {
return true; // Since AsyncResult is the result of execution, return true to indicate completion.
}
@Override
@Nullable
public V get(a) throws ExecutionException {
// If an exception occurs, the exception is thrown.
if (this.executionException ! =null) {
throw (this.executionException instanceof ExecutionException ?
(ExecutionException) this.executionException :
new ExecutionException(this.executionException));
}
// If the execution succeeds, the value result is returned
return this.value;
}
@Override
@Nullable
public V get(long timeout, TimeUnit unit) throws ExecutionException {
return get();
}
Copy the code
3.2 ListenableFutureTask
When we call a method with the @Async annotation, if the method returns type ListenableFuture, the actual method returns ListenableFutureTask object.
The ListenableFutureTask class, which also implements the ListenableFuture interface, inherits the FutureTask class, which implements ListenableFuture’s FutureTask class.
ListenableFutureTask defines #addCallback(…) for ListenableFuture. Method, the implementation source code is as follows:
private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.callbacks.addCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.callbacks.addSuccessCallback(successCallback);
this.callbacks.addFailureCallback(failureCallback);
}
Copy the code
- You can see in the
ListenableFutureTask
Middle, temporary save callback toListenableFutureCallbackRegistry
In the
ListenableFutureTask overrides the #done() method implemented by FutureTask. The implementation source code is as follows:
@Override
protected void done(a) {
Throwable cause;
try {
// Get the execution result
T result = get();
// The callback is successfully executed
this.callbacks.success(result);
return;
}
catch (InterruptedException ex) { InterruptedException InterruptedException interrupts the current thread
Thread.currentThread().interrupt();
return;
}
catch (ExecutionException ex) { // If there is an ExecutionException exception, get the real exception and set it to cause
cause = ex.getCause();
if (cause == null) { cause = ex; }}catch (Throwable ex) {
cause = ex; // Set the exception to cause
}
// Execute the exception, execute the exception callback
this.callbacks.failure(cause);
}
Copy the code
3.3 Specific Examples
Modify the DemoService code to add an asynchronous call to #execute02() and return the ListenableFuture object. The code is as follows:
@Async
public ListenableFuture<Integer> execute01AsyncWithListenableFuture(a) {
try {
//int i = 1 / 0;
return AsyncResult.forValue(this.execute02());
} catch (Throwable ex) {
returnAsyncResult.forExecutionException(ex); }}Copy the code
- Depending on the result of execution, wrap AsyncResult objects with success or exception.
DemoServiceTest tests the class by writing the # Task04 () method to call the above method asynchronously, adding the corresponding Callback method while the plug waits for execution to complete. Code:
@Test
public void task04(a) throws ExecutionException, InterruptedException {
long now = System.currentTimeMillis();
logger.info("[task04][Start execution]");
// <1> Execute the task
ListenableFuture<Integer> execute01Result = demoService.execute01AsyncWithListenableFuture();
logger.info("[task04][execute01Result is of type :({})]",execute01Result.getClass().getSimpleName());
execute01Result.addCallback(new SuccessCallback<Integer>() { // <2.1> Adds a successful callback
@Override
public void onSuccess(Integer result) {
logger.info("[onSuccess][result: {}]", result); }},new FailureCallback() { // <2.1> Adds failed callbacks
@Override
public void onFailure(Throwable ex) {
logger.info("[onFailure][exception occurred]", ex); }}); execute01Result.addCallback(new ListenableFutureCallback<Integer>() { // <2.2> Adds a unified callback for success and failure
@Override
public void onSuccess(Integer result) {
logger.info("[onSuccess][result: {}]", result);
}
@Override
public void onFailure(Throwable ex) {
logger.info("[onFailure][exception occurred]", ex); }});// <3> block to wait for results
execute01Result.get();
logger.info("[task04][End execution, elapsed time {} ms]", System.currentTimeMillis() - now);
}
Copy the code
-
< 1 >, call DemoService# execute01AsyncWithListenableFuture () method, asynchronous calls, this method and return ListenableFutureTask object. Here, let’s look at the printed log.
The 2020-06-08 14:13:16. 5060-738 the INFO [main] C.I.S.L.A.S ervice. DemoServiceTest: [task04][execute01Result is of type :(ListenableFutureTask)]Copy the code
-
<2.1>, add successful callback and failed callback.
-
<2.2>, add the unified callback for success and failure.
-
At <3>, the block waits for the result. After execution, we will see that the callback is executed and the following log is printed:
2020-06-08 14:13:21.752 INFO 5060. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task04] [end of execution, and the consumption time5057Ms]2020-06-08 14:13:21.752 INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest : [onSuccess][result: 2] 2020-06-08 14:13:21.752 INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest : [onSuccess][result: 2] Copy the code
4. Asynchronous exception handlers
By implementing AsyncUncaughtExceptionHandler interface, achieve the unity of the exception handling of asynchronous calls.
Create GlobalAsyncExceptionHandler class, global unified asynchronous invoke exception handler. Code:
@Component
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.error("[handleUncaughtException][method({}) params({}) is abnormal", method, params, ex); }}Copy the code
- Class, we added
@Component
Annotation, considering that fat friends may inject some Spring beans into properties. - implementation
#handleUncaughtException(Throwable ex, Method method, Object... params)
Method to print an exception log.
Note that AsyncUncaughtExceptionHandler can intercept asynchronous calls the method return type of the Future. By looking at AsyncExecutionAspectSupport# handleError (Throwable ex, Method Method, the Object… Params) source, can easily get this conclusion, code:
// AsyncExecutionAspectSupport.java
protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
// key!! If the return type is Future, the exception is thrown directly.
if (Future.class.isAssignableFrom(method.getReturnType())) {
ReflectionUtils.rethrowException(ex);
} else {
/ / otherwise, to AsyncUncaughtExceptionHandler to deal with.
// Could not transmit the exception to the caller with default executor
try {
this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
} catch (Throwable ex2) {
logger.warn("Exception handler for async method '" + method.toGenericString() +
"' threw unexpected exception itself", ex2); }}}Copy the code
- By the way, is AsyncExecutionAspectSupport AsyncExecutionInterceptor parent yo. So, asynchronous calling methods that return type Future need to be handled by “3. Asynchronous callback”.
4.2 AsyncConfig
Create the AsyncConfig class to configure the exception handler. Code:
@Configuration
@EnableAsync // Enable @async support
public class AsyncConfig implements AsyncConfigurer {
@Autowired
private GlobalAsyncExceptionHandler exceptionHandler;
@Override
public Executor getAsyncExecutor(a) {
return null;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(a) {
returnexceptionHandler; }}Copy the code
- Add to class
@EnableAsync
Annotations to enable asynchrony. So “2. Application”@EnableAsync
Annotations, you can get rid of them. - The AsyncConfigurer interface is implemented to implement global configuration related to asynchrony. At this point, fat people have the WebMvcConfigurer interface that they didn’t think of SpringMVC.
- implementation
#getAsyncUncaughtExceptionHandler()
Methods, return we define GlobalAsyncExceptionHandler object. - implementation
#getAsyncExecutor()
Method to return Spring Task asynchronous tasksDefault actuator. Here, we’re backnull
, does not define a default executor. So you will use TaskExecutionAutoConfiguration automation configuration class created ThreadPoolTaskExecutor task executor, as the default actuators.
4.3 DemoService
#zhaoDaoNvPengYou(…) Asynchronous invocation of. The code is as follows:
@Async
public Integer zhaoDaoNvPengYou(Integer a, Integer b) {
throw new RuntimeException("Asynchronous global exception");
}
Copy the code
4.4 Simple Test
@Test
public void testZhaoDaoNvPengYou(a) throws InterruptedException {
demoService.zhaoDaoNvPengYou(1.2);
// sleep 1 second to ensure that the asynchronous call is executed
Thread.sleep(1000);
}
Copy the code
Run the unit test with the following execution log:
2020-06-08 15:26:35.120 ERROR 11388 --- [ task-1] .i.s.l.a.c.a.GlobalAsyncExceptionHandler : [handleUncaughtException][method(public java.lang.Integer cn.iocoder.springboot.lab29.asynctask.service.DemoService.zhaoDaoNvPengYou(java.lang.Integer,java.lang.Integer)) params([1.2An exception occurs]]) Java. Lang. RuntimeException: asynchronous exceptionCopy the code
5. Customize the actuator
In the above, we use the Spring Boot TaskExecutionAutoConfiguration automation configuration class, realize automatic configuration ThreadPoolTaskExecutor task executor.
In this section, we want two custom ThreadPoolTaskExecutor task executors that implement different methods to use them separately.
5.1 Importing Dependencies
<?xml version="1.0" encoding="UTF-8"? >
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1. RELEASE</version>
<relativePath/> <! -- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>lab-29-async-demo</artifactId>
<dependencies>
<! Spring Boot dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<! Write unit tests later -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Copy the code
- Same as introducing dependencies above.
5.2 Applying a Configuration File
In application.yml, add the Spring Task scheduled Task configuration as follows:
spring:
task:
The TaskExecutionProperties configuration class. This executor is used for Spring asynchronous tasks.
execution-one:
thread-name-prefix: task-one- The prefix of the thread name for the thread pool. The default value is task-. You are advised to set this parameter based on your application
pool: Thread pool correlation
core-size: 8 Number of core threads, the number of threads initialized when the thread pool is created. The default value is 8.
max-size: 20 # maximum number of threads, the maximum number of threads in the pool. Only after the buffer queue is full will more threads than the number of core threads be applied. The default is an Integer. MAX_VALUE
keep-alive: 60s The idle time allowed for threads other than the core thread is destroyed when the idle time is reached. The default value is 60 seconds
queue-capacity: 200 Buffer queue size, the size of the queue used to buffer tasks. The default value is integer.max_value.
allow-core-thread-timeout: true Whether to allow core thread timeout, that is, enable the thread pool to grow and shrink dynamically. The default is true.
shutdown:
await-termination: true Whether to wait for scheduled tasks to complete when the application is closed. The default value is false. You are advised to set it to true
await-termination-period: 60 # Maximum time to wait for a task to complete, in seconds. The default value is 0, depending on your application
The TaskExecutionProperties configuration class. This executor is used for Spring asynchronous tasks.
execution-two:
thread-name-prefix: task-two- The prefix of the thread name for the thread pool. The default value is task-. You are advised to set this parameter based on your application
pool: Thread pool correlation
core-size: 8 Number of core threads, the number of threads initialized when the thread pool is created. The default value is 8.
max-size: 20 # maximum number of threads, the maximum number of threads in the pool. Only after the buffer queue is full will more threads than the number of core threads be applied. The default is an Integer. MAX_VALUE
keep-alive: 60s The idle time allowed for threads other than the core thread is destroyed when the idle time is reached. The default value is 60 seconds
queue-capacity: 200 Buffer queue size, the size of the queue used to buffer tasks. The default value is integer.max_value.
allow-core-thread-timeout: true Whether to allow core thread timeout, that is, enable the thread pool to grow and shrink dynamically. The default is true.
shutdown:
await-termination: true Whether to wait for scheduled tasks to complete when the application is closed. The default value is false. You are advised to set it to true
await-termination-period: 60 # Maximum time to wait for a task to complete, in seconds. The default value is 0, depending on your application
Copy the code
- in
spring.task
Under configuration, we addedexecution-one
和execution-two
Configuration of two actuators. In terms of format, we keep and see in “2.7 Applying profiles”spring.task.exeuction
Consistent, so that we can reuse the TaskExecutionProperties property configuration class for mapping.
5.3 AsyncConfig
Create the AsyncConfig class and configure two actuators. The code is as follows:
@Configuration
@EnableAsync // Enable @async support
public class AsyncConfig
{
public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one";
public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two";
@Configuration
public static class ExecutorOneConfiguration
{
@Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties")
@Primary
@ConfigurationProperties(prefix = "spring.task.execution-one")
// Read the spring.task.executionProperties configuration into the TaskExecutionProperties object
public TaskExecutionProperties taskExecutionProperties(a)
{
return new TaskExecutionProperties();
}
@Bean(name = EXECUTOR_ONE_BEAN_NAME)
public ThreadPoolTaskExecutor threadPoolTaskExecutor(a)
{
// Create a TaskExecutorBuilder object
TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
Create a ThreadPoolTaskExecutor object
returnbuilder.build(); }}@Configuration
public static class ExecutorTwoConfiguration
{
@Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties")
@ConfigurationProperties(prefix = "spring.task.execution-two")
// Read the spring.task.executionProperties configuration into the TaskExecutionProperties object
public TaskExecutionProperties taskExecutionProperties(a)
{
return new TaskExecutionProperties();
}
@Bean(name = EXECUTOR_TWO_BEAN_NAME)
public ThreadPoolTaskExecutor threadPoolTaskExecutor(a)
{
// Create a TaskExecutorBuilder object
TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
Create a ThreadPoolTaskExecutor object
returnbuilder.build(); }}private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties)
{
/ / the Pool properties
TaskExecutionProperties.Pool pool = properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
/ / Shutdown properties
TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
// Other basic attributes
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
// builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
// builder = builder.taskDecorator(taskDecorator.getIfUnique());
returnbuilder; }}Copy the code
- Reference Spring Boot TaskExecutionAutoConfiguration automation configuration class, we created the ExecutorOneConfiguration and ExecutorTwoConfiguration configuration class, To create beans named
executor-one
和executor-two
Two actuators.
5.4 DemoService
@Service
public class DemoService
{
private Logger logger = LoggerFactory.getLogger(getClass());
@Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME)
public Integer execute01(a)
{
logger.info("[execute01]");
return 1;
}
@Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME)
public Integer execute02(a)
{
logger.info("[execute02]");
return 2; }}Copy the code
- in
@Async
On the annotation, we set the name of the Bean for the executor it uses.
5.5 Simple Test
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest
{
@Autowired
private DemoService demoService;
@Test
public void testExecute(a) throws InterruptedException
{
demoService.execute01();
demoService.execute02();
// sleep 1 second to ensure that the asynchronous call is executed
Thread.sleep(1000); }}Copy the code
Run the unit test with the following execution log:
2020-06-08 15:38:28.846 INFO 12020 --- [ task-one-1] c.i.s.l.asynctask.service.DemoService : [execute01]
2020-06-08 15:38:28.846 INFO 12020 --- [ task-two-1] c.i.s.l.asynctask.service.DemoService : [execute02]
Copy the code
- From the logs, we can see that,
#execute01()
Methods in theexecutor-one
In the executor, while#execute02()
Methods in theexecutor-two
Execute in the executor.