Spring asynchronous calls, multithreading

  • An overview of the
  • Quick start
  • An asynchronous callback
  • Asynchronous exception handling
  • Custom actuators

1, an overview of the


In daily development, our logic is called synchronously and executed sequentially. But in some cases we want to call asynchronously, separating the main thread from part of the logic, for faster program execution and improved performance. For example, high concurrency interfaces, user operation logs, etc.

Asynchronous invocation, corresponding to synchronous invocation.

  • Synchronous invocation: programs are executed in a defined order. Each line of programs must wait until the previous line of programs finishes executing.
  • Asynchronous call: the sequential execution of programs without waiting for the asynchronous call to return the execution result.

For asynchronous reliability, we usually consider introducing message queues, such as RabbitMQ, RocketMQ, Kafka, etc. But in some cases, we don’t need such high reliability and can use in-process queues or thread pools.

public static void main(String[] args) {
        // Create a thread pool. This is just a temporary test, development specification...
        ExecutorService executor = Executors.newFixedThreadPool(10);

        // Submit the task to the thread pool for execution.
        executor.submit(new Runnable() {

            @Override
            public void run(a) {
                System.out.println("I heard I was called asynchronously."); }}); }Copy the code

Queues or thread pools within processes are less reliable because the tasks in queues and thread pools are only stored in memory, and if a JVM process is abnormally shut down, it will be lost and not executed.

In distributed message queues, asynchronous calls are stored as a message on the message server, so even if the JVM process is abnormally interrupted, the message remains on the server of the message service queue

So when using in-process queues or thread pools for asynchronous calls, it is important to make sure that JVM processes are shut down gracefully and that they are executed before they are shut down.

The Spring Task module of the Spring Framework provides the @async annotation, which can be added to a method to automatically implement asynchronous calls to the method

In simple terms, we can use declarative Async with @Transactional Transactional transactions, using the @async annotation provided by SpringTask. In the implementation principle, it is also based on Spring AOP interception to achieve asynchronous submission of the operation to the thread pool, to achieve the purpose of asynchronous invocation.

2. Quick start

2.1 Importing Dependencies

<?xml version="1.0" encoding="UTF-8"? >
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1. RELEASE</version>
        <relativePath/> <! -- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-29-async-demo</artifactId>

    <dependencies>
        <! Spring Boot dependency -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <! Write unit tests later -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

Copy the code

Since Spring Task is a module of the Spring Framework, we don’t need to specifically introduce it after we introduce the Spring-boot-Web dependency.

2.2 Application


Create the Application class and add @EnableAsync to enable @Async support

@SpringBootApplication
@EnableAsync // Enable @async support
public class Application {

    public static void main(String[] args) { SpringApplication.run(Application.class, args); }}Copy the code
  • Add to class@EnableAsyncAnnotations to enable asynchrony.

2.3 DemoService

package cn.iocoder.springboot.lab29.asynctask.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Service
public class DemoService {

    private Logger logger = LoggerFactory.getLogger(getClass());
    
    public Integer execute01(a) {
        logger.info("[execute01]");
        sleep(10);
        return 1;
    }

    public Integer execute02(a) {
        logger.info("[execute02]");
        sleep(5);
        return 2;
    }

    private static void sleep(int seconds) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {
            throw newRuntimeException(e); }}@Async
    public Integer zhaoDaoNvPengYou(Integer a, Integer b) {
        throw new RuntimeException("Programmers don't need girlfriends."); }}Copy the code
  • Define execute01 and execute02 methods to simulate sleep for 10 seconds and 5 seconds, respectively.

  • At the same time, in the method, use logger to print the log, so that we can see the execution time of each method, and the execution thread

2.4 Synchronous Invocation test

Write the DemoServiceTest test class, add the # Task01 () method, and call the above method synchronously as follows:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private DemoService demoService;

    @Test
    public void task01(a) {
        long now = System.currentTimeMillis();
        logger.info("[task01][Start execution]");

        demoService.execute01();
        demoService.execute02();

        logger.info("[task01][end execution, elapsed time {} ms]", System.currentTimeMillis() - now); }}Copy the code

Run the unit test and print the following log:

2020-06-02 09:16:03.391  INFO 3108. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task01] [started]2020-06-02 09:16:03.402  INFO 3108 --- [      main] c.i.s.l.asynctask.service.DemoService    : [execute01]
2020-06-02 09:16:13.403  INFO 3108 --- [      main] c.i.s.l.asynctask.service.DemoService    : [execute02]
2020-06-02 09:16:18.403  INFO 3108. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task01] [end of execution, and the consumption time15012Ms]Copy the code
  • Both methods are executed sequentially and take 15 seconds to execute.
  • Both execute on the main thread.

2.5 Asynchronous Invocation Test


Modify DemoServiceTest to add execute01Async() and execute02Async() async call methods:

	@Async
    public Integer execute01Async(a) {
        return this.execute01();
    }

    @Async
    public Integer execute02Async(a) {
        return this.execute02();
    }
Copy the code
  • inexecute01Async()execute01Async(), add@AsyncImplementing asynchronous calls

Modify the DemoServiceTest class and write the # Task02 () method to call the above two methods asynchronously.

	@Test
    public void task02(a) {
        long now = System.currentTimeMillis();
        logger.info("[task02][Start execution]");

        demoService.execute01Async();
        demoService.execute02Async();

        logger.info("[task02][End execution, elapsed time {} ms]", System.currentTimeMillis() - now);
    }
Copy the code

Print logs:

The 2020-06-02 10:57:41. 14416-643 the INFO [main] C.I.S.L.A.S ervice. DemoServiceTest: [task02] [started] 10:57:41 2020-06-02. 14416-675 the INFO [main] O.S.S.C oncurrent. ThreadPoolTaskExecutor: Initializing ExecutorService'applicationTaskExecutor'The 2020-06-02 10:57:41. 14416-682 the INFO [main] C.I.S.L.A.S ervice. DemoServiceTest: [task02] [end of execution, and the consumption time 39 milliseconds]Copy the code
  • DemoService’s two methods execute asynchronously, so the main thread only takes about 39 milliseconds. Notice that the actual execution of these two methods is not complete.
  • Both methods of DemoService are executed in an asynchronous thread pool.

2.6 Waiting for an asynchronous call to complete the test

In the above ** asynchronous call, the two methods are only asynchronous calls and the method is not finished. In some business scenarios, we have an asynchronous call and the main thread returns a result, so we need the main thread to block and wait for the result of the asynchronous call.

Modify DemoService to add execute01AsyncWithFuture() and execute01AsyncWithFuture() asynchronous calls and return a Future object. Code:

	@Async
    public Future<Integer> execute01AsyncWithFuture(a) {
        return AsyncResult.forValue(this.execute01());
    }

    @Async
    public Future<Integer> execute02AsyncWithFuture(a) {
        return AsyncResult.forValue(this.execute02());
    }
Copy the code
  • In the two asynchronous methods here, addAsyncResult.forValue(this.execute02());, returns with the result of executionThe Future object

Modify the DemoServiceTest class by writing a # Task02 () method that calls the above two methods asynchronously and blocks the thread for the result of the asynchronous call

Code:

	@Test
    public void task03(a) throws ExecutionException, InterruptedException {
        long now = System.currentTimeMillis();
        logger.info("[task03][Start execution]");

        // Execute the task
        Future<Integer> execute01Result = demoService.execute01AsyncWithFuture();
        Future<Integer> execute02Result = demoService.execute02AsyncWithFuture();
        // block to wait for results
        execute01Result.get();
        execute02Result.get();

        logger.info("[task03][End execution, elapsed time {} ms]", System.currentTimeMillis() - now);
    }
Copy the code
  • Call both methods asynchronously and return the corresponding Future object. The asynchronous invocation logic of these two can be executed in parallel.
  • The Future object ofget()Method, effect: blocks the thread to wait for the result to return.

Print logs:

2020-06-02 13:56:43.955  INFO 7828. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task03] [started]2020-06-02 13:56:43.987  INFO 7828 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-06-02 13:56:44.008  INFO 7828 --- [ task-1] c.i.s.l.asynctask.service.DemoService    : [execute01]
2020-06-02 13:56:44.008  INFO 7828 --- [ task-2] c.i.s.l.asynctask.service.DemoService    : [execute02]
2020-06-02 13:56:54.008  INFO 7828. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task03] [end of execution, and the consumption time10053Ms]Copy the code
  • Two asynchronous invocation methods, one by thread pooltask-1andtask-2Simultaneous execution. Because the main thread blocks and waits for the execution result, the execution time is 10 seconds. When there are multiple asynchronous calls at the same time, the thread blocks and waits, and the execution time is determined by the asynchronous call logic that consumes the longest.

2.7 Applying the Configuration File

In application, add the Spring Task configuration

spring:
  task:
    The TaskExecutionProperties configuration class. This executor is used for Spring asynchronous tasks.
    execution:
      thread-name-prefix: task- The prefix of the thread name for the thread pool. The default value is task-. You are advised to set this parameter based on your application
      pool: Thread pool correlation
        core-size: 8 Number of core threads, the number of threads initialized when the thread pool is created. The default value is 8.
        max-size: 20 # maximum number of threads, the maximum number of threads in the pool. Only after the buffer queue is full will more threads than the number of core threads be applied. The default is an Integer. MAX_VALUE
        keep-alive: 60s The idle time allowed for threads other than the core thread is destroyed when the idle time is reached. The default value is 60 seconds
        queue-capacity: 200 Buffer queue size, the size of the queue used to buffer tasks. The default value is integer.max_value.
        allow-core-thread-timeout: true Whether to allow core thread timeout, that is, enable the thread pool to grow and shrink dynamically. The default is true.
      shutdown:
        await-termination: true Whether to wait for scheduled tasks to complete when the application is closed. The default value is false. You are advised to set it to true
        await-termination-period: 60 # Maximum time to wait for a task to complete, in seconds. The default value is 0, depending on your application
Copy the code
  • Spring itself relies on Spring Task
  • inspring.task.executionConfiguration item, Spring Task scheduling Task configuration, correspondingTaskExecutionPropertiesThe configuration class
  • Spring Boot TaskExecutionAutoConfiguration automation, configuration, and realized the Spring automatic configuration, the Task is createdThreadPoolTaskExecutorThread pool-based task executor, in factThreadPoolTaskExecutorisThreadPoolExecutorThe sub-assembly, the main increase in the execution of the task, and returnListenableFutureObject functionality.

The reliability of asynchrony mentioned earlier, shut down the process gracefully. Spring, task execution. Shutdown configuration closed, in order to realize the spring task elegant shut down. During the execution of an asynchronous task, if the application is shut down, the beans used by the asynchronous task will be destroyed. For example, if the asynchronous task needs to access the database connection pool while the asynchronous task is still in execution, an error will be reported if there is no corresponding Bean when the asynchronous task needs to access the database.

  • After await-termination: true is configured, asynchronous tasks are waiting for completion when the application is closed. When the application shuts down, Spring waits for ThreadPoolTaskExecutor to complete its task before destroying the Bean.

  • In some business scenarios it is not possible to keep Spring waiting for asynchronous tasks to complete when the application is shut down. By setting await-abortion-period to 60, you can set the maximum waiting time of Spring. When the time reaches, asynchronous tasks will not wait for completion.

3. Asynchronous callback

In a service scenario, a callback may be required after an asynchronous task is executed. The following describes how to implement custom callbacks after asynchronous execution is complete.

3.1 AsyncResult source code interpretation

The AsyncResult class we saw in 2.6 Waiting for an Asynchronous call to complete represents asynchronous results. The returned results are divided into two cases:

  • On success, the AsyncResult#forValue(V value) static method is called and returns a successful ListenableFuture object,

    Source:

    	/**
    	 * Create a new async result which exposes the given value from {@link Future#get()}.
    	 * @param value the value to expose
    	 * @since 4.2
    	 * @see Future#get()
    	 */
    	public static <V> ListenableFuture<V> forValue(V value) {
    		return new AsyncResult<>(value, null);
    	}
    Copy the code
  • When an exception is executed, the AsyncResult#forExecutionException(Throwable ex) static method is called and the ListenableFuture object of the exception is returned. Source:

    	/**
    	 * Create a new async result which exposes the given exception as an
    	 * {@link ExecutionException} from {@link Future#get()}.
    	 * @param ex the exception to expose (either an pre-built {@link ExecutionException}
    	 * or a cause to be wrapped in an {@link ExecutionException})
    	 * @since 4.2
    	 * @see ExecutionException
    	 */
    	public static <V> ListenableFuture<V> forExecutionException(Throwable ex) {
    		return new AsyncResult<>(null, ex);
    	}
    Copy the code

AsyncResult also implements the ListenableFuture interface, which provides callback processing for asynchronous execution results.

public class AsyncResult<V> implements ListenableFuture<V>
Copy the code

ListenableFuture interface, source:

public interface ListenableFuture<T> extends Future<T> {

    // Add callback methods to handle success and exception cases uniformly.
	void addCallback(ListenableFutureCallback<? super T> callback);

	// Add success and failure callback methods to handle success and exception cases, respectively.
	void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);


	// Convert ListenableFuture to CompletableFuture provided by JDK8.
    // Then we can use ListenableFuture to set the callback
	default CompletableFuture<T> completable(a) {
		CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
		addCallback(completable::complete, completable::completeExceptionally);
		returncompletable; }}Copy the code

ListenableFuture inherits the Future, so AsyncResult also implements the Future interface.

public interface Future<V> {

    // If the task has not yet started, cancel(...) Method will return false;
    // If the task has been started, executing the cancel(true) method attempts to stop the task by interrupting the thread executing the task, and returns true on success;
    // When the task has been started, executing the cancel(false) method will not affect the thread executing the task (let the thread execute normally until completion), and returns false;
    // When the task is complete, cancel(...) Method will return false.
    The mayInterruptRunning parameter indicates whether to interrupt the executing thread.
    boolean cancel(boolean mayInterruptIfRunning);

    // If the task is cancelled before completion, true is returned.
    boolean isCancelled(a);

    // Return true if the task is completed, whether it is completed normally, cancelled, or if an exception occurs.
    boolean isDone(a);

    // Gets the result of the asynchronous execution. If no result is available, this method blocks until the asynchronous calculation is complete.
    V get(a) throws InterruptedException, ExecutionException;
    
	If no result is available, this method blocks, but with a time limit, and throws an exception if the block lasts longer than a timeout.
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code

In the AsyncResult addCallback (…). Method callback implementation, source code:

	@Override
	public void addCallback(ListenableFutureCallback<? super V> callback) {
		addCallback(callback, callback);
	}

	@Override
	public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
		try {
			if (this.executionException ! =null) { / / "1"
				failureCallback.onFailure(exposedException(this.executionException));
			}
			else { / / "2"
				successCallback.onSuccess(this.value); }}catch (Throwable ex) { / / "3"
			// Ignore}}// Get the original exception from ExecutionException.
private static Throwable exposedException(Throwable original) {
    if (original instanceof ExecutionException) {
        Throwable cause = original.getCause();
        if(cause ! =null) {
            returncause; }}return original;
}
Copy the code
  • From ListenableFutureCallback, the ListenableFutureCallback interface inherits both SuccessCallback and FailureCallback interfaces
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback 
Copy the code
  • 1. If the fault is handled abnormally, the failureCallback callback is invoked
  • 2. Call the successCallback if the processing result is successful
  • 3. If the callback logic is abnormal, it is ignored directly. Assume that multiple callbacks, one of which appears on the agenda, will not affect the other callbacks.

In fact AsyncResult is executed as a result of asynchrony. Since it is the result, the execution is complete. So, before we call #addCallback(…) Interface methods to add a callback must directly use the callback to process the result of execution.

All methods defined by AsyncResult to the Future are implemented as follows:

// AsyncResult.java

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    return false; // AsyncResult is the result of execution, so return false to indicate cancellation failure.
}

@Override
public boolean isCancelled(a) {
    return false; // AsyncResult is the result of execution, so return false to indicate that it is not cancelled.
}

@Override
public boolean isDone(a) {
    return true; // Since AsyncResult is the result of execution, return true to indicate completion.
}

@Override
@Nullable
public V get(a) throws ExecutionException {
    // If an exception occurs, the exception is thrown.
    if (this.executionException ! =null) {
        throw (this.executionException instanceof ExecutionException ?
                (ExecutionException) this.executionException :
                new ExecutionException(this.executionException));
    }
    // If the execution succeeds, the value result is returned
    return this.value;
}

@Override
@Nullable
public V get(long timeout, TimeUnit unit) throws ExecutionException {
    return get();
}
Copy the code

3.2 ListenableFutureTask

When we call a method with the @Async annotation, if the method returns type ListenableFuture, the actual method returns ListenableFutureTask object.

The ListenableFutureTask class, which also implements the ListenableFuture interface, inherits the FutureTask class, which implements ListenableFuture’s FutureTask class.

ListenableFutureTask defines #addCallback(…) for ListenableFuture. Method, the implementation source code is as follows:

private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();

@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
    this.callbacks.addCallback(callback);
}

@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
    this.callbacks.addSuccessCallback(successCallback);
    this.callbacks.addFailureCallback(failureCallback);
}
Copy the code
  • You can see in theListenableFutureTaskMiddle, temporary save callback toListenableFutureCallbackRegistryIn the

ListenableFutureTask overrides the #done() method implemented by FutureTask. The implementation source code is as follows:

@Override
	protected void done(a) {
		Throwable cause;
		try {
            // Get the execution result
			T result = get();
            // The callback is successfully executed
			this.callbacks.success(result);
			return;
		}
		catch (InterruptedException ex) { InterruptedException InterruptedException interrupts the current thread
			Thread.currentThread().interrupt();
			return;
		}
		catch (ExecutionException ex) { // If there is an ExecutionException exception, get the real exception and set it to cause
			cause = ex.getCause();
			if (cause == null) { cause = ex; }}catch (Throwable ex) {
			cause = ex; // Set the exception to cause
		}
         // Execute the exception, execute the exception callback
		this.callbacks.failure(cause);
	}
Copy the code

3.3 Specific Examples

Modify the DemoService code to add an asynchronous call to #execute02() and return the ListenableFuture object. The code is as follows:

@Async
    public ListenableFuture<Integer> execute01AsyncWithListenableFuture(a) {
        try {
            //int i = 1 / 0;
            return AsyncResult.forValue(this.execute02());
        } catch (Throwable ex) {
            returnAsyncResult.forExecutionException(ex); }}Copy the code
  • Depending on the result of execution, wrap AsyncResult objects with success or exception.

DemoServiceTest tests the class by writing the # Task04 () method to call the above method asynchronously, adding the corresponding Callback method while the plug waits for execution to complete. Code:

@Test
    public void task04(a) throws ExecutionException, InterruptedException {
        long now = System.currentTimeMillis();
        logger.info("[task04][Start execution]");

        // <1> Execute the task
        ListenableFuture<Integer> execute01Result = demoService.execute01AsyncWithListenableFuture();
        logger.info("[task04][execute01Result is of type :({})]",execute01Result.getClass().getSimpleName());
        execute01Result.addCallback(new SuccessCallback<Integer>() { // <2.1> Adds a successful callback

            @Override
            public void onSuccess(Integer result) {
                logger.info("[onSuccess][result: {}]", result); }},new FailureCallback() { // <2.1> Adds failed callbacks

            @Override
            public void onFailure(Throwable ex) {
                logger.info("[onFailure][exception occurred]", ex); }}); execute01Result.addCallback(new ListenableFutureCallback<Integer>() { // <2.2> Adds a unified callback for success and failure

            @Override
            public void onSuccess(Integer result) {
                logger.info("[onSuccess][result: {}]", result);
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.info("[onFailure][exception occurred]", ex); }});// <3> block to wait for results
        execute01Result.get();

        logger.info("[task04][End execution, elapsed time {} ms]", System.currentTimeMillis() - now);
    }
Copy the code
  • < 1 >, call DemoService# execute01AsyncWithListenableFuture () method, asynchronous calls, this method and return ListenableFutureTask object. Here, let’s look at the printed log.

    The 2020-06-08 14:13:16. 5060-738 the INFO [main] C.I.S.L.A.S ervice. DemoServiceTest: [task04][execute01Result is of type :(ListenableFutureTask)]Copy the code
  • <2.1>, add successful callback and failed callback.

  • <2.2>, add the unified callback for success and failure.

  • At <3>, the block waits for the result. After execution, we will see that the callback is executed and the following log is printed:

    2020-06-08 14:13:21.752  INFO 5060. -- - [the main] C.I.S.L.A.S ervice DemoServiceTest: [task04] [end of execution, and the consumption time5057Ms]2020-06-08 14:13:21.752  INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest   : [onSuccess][result: 2]
    2020-06-08 14:13:21.752  INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest   : [onSuccess][result: 2]
    Copy the code

4. Asynchronous exception handlers

By implementing AsyncUncaughtExceptionHandler interface, achieve the unity of the exception handling of asynchronous calls.

Create GlobalAsyncExceptionHandler class, global unified asynchronous invoke exception handler. Code:

@Component
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        logger.error("[handleUncaughtException][method({}) params({}) is abnormal", method, params, ex); }}Copy the code
  • Class, we added@ComponentAnnotation, considering that fat friends may inject some Spring beans into properties.
  • implementation#handleUncaughtException(Throwable ex, Method method, Object... params)Method to print an exception log.

Note that AsyncUncaughtExceptionHandler can intercept asynchronous calls the method return type of the Future. By looking at AsyncExecutionAspectSupport# handleError (Throwable ex, Method Method, the Object… Params) source, can easily get this conclusion, code:

// AsyncExecutionAspectSupport.java

protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
    // key!! If the return type is Future, the exception is thrown directly.
    if (Future.class.isAssignableFrom(method.getReturnType())) {
        ReflectionUtils.rethrowException(ex);
    } else {
        / / otherwise, to AsyncUncaughtExceptionHandler to deal with.
        // Could not transmit the exception to the caller with default executor
        try {
            this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
        } catch (Throwable ex2) {
            logger.warn("Exception handler for async method '" + method.toGenericString() +
                    "' threw unexpected exception itself", ex2); }}}Copy the code
  • By the way, is AsyncExecutionAspectSupport AsyncExecutionInterceptor parent yo. So, asynchronous calling methods that return type Future need to be handled by “3. Asynchronous callback”.

4.2 AsyncConfig


Create the AsyncConfig class to configure the exception handler. Code:

@Configuration
@EnableAsync // Enable @async support
public class AsyncConfig implements AsyncConfigurer {

    @Autowired
    private GlobalAsyncExceptionHandler exceptionHandler;

    @Override
    public Executor getAsyncExecutor(a) {
        return null;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(a) {
        returnexceptionHandler; }}Copy the code
  • Add to class@EnableAsyncAnnotations to enable asynchrony. So “2. Application”@EnableAsyncAnnotations, you can get rid of them.
  • The AsyncConfigurer interface is implemented to implement global configuration related to asynchrony. At this point, fat people have the WebMvcConfigurer interface that they didn’t think of SpringMVC.
  • implementation#getAsyncUncaughtExceptionHandler()Methods, return we define GlobalAsyncExceptionHandler object.
  • implementation#getAsyncExecutor()Method to return Spring Task asynchronous tasksDefault actuator. Here, we’re backnull, does not define a default executor. So you will use TaskExecutionAutoConfiguration automation configuration class created ThreadPoolTaskExecutor task executor, as the default actuators.

4.3 DemoService

#zhaoDaoNvPengYou(…) Asynchronous invocation of. The code is as follows:

@Async
public Integer zhaoDaoNvPengYou(Integer a, Integer b) {
    throw new RuntimeException("Asynchronous global exception");
}
Copy the code

4.4 Simple Test

 @Test
    public void testZhaoDaoNvPengYou(a) throws InterruptedException {
        demoService.zhaoDaoNvPengYou(1.2);

        // sleep 1 second to ensure that the asynchronous call is executed
        Thread.sleep(1000);
    }
Copy the code

Run the unit test with the following execution log:

2020-06-08 15:26:35.120 ERROR 11388 --- [         task-1] .i.s.l.a.c.a.GlobalAsyncExceptionHandler : [handleUncaughtException][method(public java.lang.Integer cn.iocoder.springboot.lab29.asynctask.service.DemoService.zhaoDaoNvPengYou(java.lang.Integer,java.lang.Integer)) params([1.2An exception occurs]]) Java. Lang. RuntimeException: asynchronous exceptionCopy the code

5. Customize the actuator

In the above, we use the Spring Boot TaskExecutionAutoConfiguration automation configuration class, realize automatic configuration ThreadPoolTaskExecutor task executor.

In this section, we want two custom ThreadPoolTaskExecutor task executors that implement different methods to use them separately.

5.1 Importing Dependencies

<?xml version="1.0" encoding="UTF-8"? >
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1. RELEASE</version>
        <relativePath/> <! -- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-29-async-demo</artifactId>

    <dependencies>
        <! Spring Boot dependency -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <! Write unit tests later -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>
Copy the code
  • Same as introducing dependencies above.

5.2 Applying a Configuration File


In application.yml, add the Spring Task scheduled Task configuration as follows:

spring:
  task:
    The TaskExecutionProperties configuration class. This executor is used for Spring asynchronous tasks.
    execution-one:
      thread-name-prefix: task-one- The prefix of the thread name for the thread pool. The default value is task-. You are advised to set this parameter based on your application
      pool: Thread pool correlation
        core-size: 8 Number of core threads, the number of threads initialized when the thread pool is created. The default value is 8.
        max-size: 20 # maximum number of threads, the maximum number of threads in the pool. Only after the buffer queue is full will more threads than the number of core threads be applied. The default is an Integer. MAX_VALUE
        keep-alive: 60s The idle time allowed for threads other than the core thread is destroyed when the idle time is reached. The default value is 60 seconds
        queue-capacity: 200 Buffer queue size, the size of the queue used to buffer tasks. The default value is integer.max_value.
        allow-core-thread-timeout: true Whether to allow core thread timeout, that is, enable the thread pool to grow and shrink dynamically. The default is true.
      shutdown:
        await-termination: true Whether to wait for scheduled tasks to complete when the application is closed. The default value is false. You are advised to set it to true
        await-termination-period: 60 # Maximum time to wait for a task to complete, in seconds. The default value is 0, depending on your application
    The TaskExecutionProperties configuration class. This executor is used for Spring asynchronous tasks.
    execution-two:
      thread-name-prefix: task-two- The prefix of the thread name for the thread pool. The default value is task-. You are advised to set this parameter based on your application
      pool: Thread pool correlation
        core-size: 8 Number of core threads, the number of threads initialized when the thread pool is created. The default value is 8.
        max-size: 20 # maximum number of threads, the maximum number of threads in the pool. Only after the buffer queue is full will more threads than the number of core threads be applied. The default is an Integer. MAX_VALUE
        keep-alive: 60s The idle time allowed for threads other than the core thread is destroyed when the idle time is reached. The default value is 60 seconds
        queue-capacity: 200 Buffer queue size, the size of the queue used to buffer tasks. The default value is integer.max_value.
        allow-core-thread-timeout: true Whether to allow core thread timeout, that is, enable the thread pool to grow and shrink dynamically. The default is true.
      shutdown:
        await-termination: true Whether to wait for scheduled tasks to complete when the application is closed. The default value is false. You are advised to set it to true
        await-termination-period: 60 # Maximum time to wait for a task to complete, in seconds. The default value is 0, depending on your application
Copy the code
  • inspring.taskUnder configuration, we addedexecution-oneexecution-twoConfiguration of two actuators. In terms of format, we keep and see in “2.7 Applying profiles”spring.task.exeuctionConsistent, so that we can reuse the TaskExecutionProperties property configuration class for mapping.

5.3 AsyncConfig


Create the AsyncConfig class and configure two actuators. The code is as follows:

@Configuration
@EnableAsync // Enable @async support
public class AsyncConfig
{

    public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one";
    public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two";

    @Configuration
    public static class ExecutorOneConfiguration
    {

        @Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties")
        @Primary
        @ConfigurationProperties(prefix = "spring.task.execution-one")
        // Read the spring.task.executionProperties configuration into the TaskExecutionProperties object
        public TaskExecutionProperties taskExecutionProperties(a)
        {
            return new TaskExecutionProperties();
        }

        @Bean(name = EXECUTOR_ONE_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor(a)
        {
            // Create a TaskExecutorBuilder object
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            Create a ThreadPoolTaskExecutor object
            returnbuilder.build(); }}@Configuration
    public static class ExecutorTwoConfiguration
    {

        @Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties")
        @ConfigurationProperties(prefix = "spring.task.execution-two")
        // Read the spring.task.executionProperties configuration into the TaskExecutionProperties object
        public TaskExecutionProperties taskExecutionProperties(a)
        {
            return new TaskExecutionProperties();
        }

        @Bean(name = EXECUTOR_TWO_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor(a)
        {
            // Create a TaskExecutorBuilder object
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            Create a ThreadPoolTaskExecutor object
            returnbuilder.build(); }}private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties)
    {
        / / the Pool properties
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        / / Shutdown properties
        TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        // Other basic attributes
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
// builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
// builder = builder.taskDecorator(taskDecorator.getIfUnique());
        returnbuilder; }}Copy the code
  • Reference Spring Boot TaskExecutionAutoConfiguration automation configuration class, we created the ExecutorOneConfiguration and ExecutorTwoConfiguration configuration class, To create beans namedexecutor-oneexecutor-twoTwo actuators.

5.4 DemoService


@Service
public class DemoService
{

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME)
    public Integer execute01(a)
    {
        logger.info("[execute01]");
        return 1;
    }

    @Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME)
    public Integer execute02(a)
    {
        logger.info("[execute02]");
        return 2; }}Copy the code
  • in@AsyncOn the annotation, we set the name of the Bean for the executor it uses.

5.5 Simple Test


@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest
{

    @Autowired
    private DemoService demoService;

    @Test
    public void testExecute(a) throws InterruptedException
    {
        demoService.execute01();
        demoService.execute02();

        // sleep 1 second to ensure that the asynchronous call is executed
        Thread.sleep(1000); }}Copy the code

Run the unit test with the following execution log:

2020-06-08 15:38:28.846  INFO 12020 --- [     task-one-1] c.i.s.l.asynctask.service.DemoService    : [execute01]
2020-06-08 15:38:28.846  INFO 12020 --- [     task-two-1] c.i.s.l.asynctask.service.DemoService    : [execute02]
Copy the code
  • From the logs, we can see that,#execute01()Methods in theexecutor-oneIn the executor, while#execute02()Methods in theexecutor-twoExecute in the executor.