1. Introduction

Before reading this article, you may have experienced using the @async annotation for asynchronous processing of tasks. During project development, asynchronous processing is often performed for non-main flow, non-real-time and time-consuming tasks, which will not affect the main flow, but also improve the response time of the main flow.

When using the @async annotation for asynchronous processing, you may have stumbled on a number of problems, such as tasks not executing asynchronously, tasks interacting with each other due to shared thread pools, and asynchronous tasks not knowing how to handle exceptions. Today I’m going to take you to see what it really is, so that the next time you have a problem, you don’t have to panic.

2. The quest

2.1 Implementation Principles

2.1.1 Looking for asynchronous annotation postprocessor

You should know that in order to use the @async annotation to perform asynchronous tasks in your project, you need to manually EnableAsync by adding @enableasync

@SpringBootApplication
@EnableAsync
public class SpringBootAsyncApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBootAsyncApplication.class, args);
    }}
Copy the code

Since the @enableasync annotation can be used to EnableAsync functionality, this annotation is our entry point

Enter the @enableAsync annotation, and you’ll see another familiar annotation, @import, which introduces a configuration class for that function into the program

@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {}
Copy the code

Is the introduction of open AsyncConfigurationSelector point, you can see the ProxyAsyncConfiguration configuration class

public String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
        case PROXY:
            return new String[] {ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ:
            return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
        default:
            return null;
    }}
Copy the code

Go to the ProxyAsyncConfiguration configuration class

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation ! = AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; }Copy the code

Can see ProxyAsyncConfiguration configuration AsyncAnnotationBeanPostProcessor declared in a class a Bean, from literal meaning can guess the Bean should be the protagonist of asynchronous processing, then to see what kind of work does the protagonist did

Into AsyncAnnotationBeanPostProcessor, you can see the class implements the BeanFactoryAware BeanPostProcessor, these two interfaces, which is closely linked with the Bean lifecycle According to the Bean life cycle characteristics, we can know that the implementation method of BeanFactoryAware interface is implemented before that of BeanPostProcessor interface.

2.1.2 BeanFactoryAware implementation

2.1.2.1 Define the section

@Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor = new AsyncAnnotationAdvisor(this.executor, this.ExceptionHandler); if (this.asyncAnnotationType ! = null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }Copy the code

The setBeanFactory() implementation defines the aspect object. When you see the word “aspect”, two concepts immediately come to mind: pointcut and notification

  • Cut point: Used to declare the target to cut into
  • Notification: The appropriate treatment for the entry target

2.1.3 Define the pointcut

Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } Catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}Copy the code
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : AsyncAnnotationTypes) {// Define Pointcut CPC = new with @async, @asynchronous annotations on the class AnnotationMatchingPointcut(asyncAnnotationType, true); / / define annotation @ Async, @ Asynchronous annotation on the method of point of tangency Pointcut MPC = new AnnotationMatchingPointcut (null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result ! = null ? result : Pointcut.TRUE); }Copy the code

2.1.4 Defining notifications

protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @ Nullable: Supplier < AsyncUncaughtExceptionHandler > exceptionHandler) {/ / inform AnnotationAsyncExecutionInterceptor definition interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }Copy the code

Notification is the ultimate implementation, and it’s a pretty important part, and since it’s important, we need to look at the implementation

public Object invoke(final MethodInvocation invocation) throws Throwable { Class<? > targetClass = (invocation.getThis() ! = null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor Executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); Callable<Object> task = () -> {try {Object result = invocation. Proceed (); if (result instanceof Future) { return ((Future<? >) result).get(); }}... return null; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }Copy the code
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<? > returnType) {/ / return value type asynchronous task is CompletableFuture if (CompletableFuture. Class. IsAssignableFrom (returnType)) {return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } / / return value type asynchronous task is ListenableFuture else if (ListenableFuture. Class. IsAssignableFrom (returnType)) {return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } / / return value type asynchronous task is Future else if (Future) class) isAssignableFrom (returnType)) {return executor. Submit (task); } else {executor.submit(task); return null; }}Copy the code

The specific implementation of notification is as follows:

  • The first step is to get the asynchronous task thread pool to execute the asynchronous task
  • Wrap the target method with Callable
  • Perform asynchronous asynchronous tasks and perform corresponding processing according to different return value types

Notifications give you an idea of how asynchronous tasks ultimately work, and you might wonder, how do you tell notifications to perform asynchronous tasks?

If you remember the BeanPostProcessor interface mentioned above, let’s take a look at its implementation

2.1.3 BeanPostProcessor implementation

Speaking of the BeanPostProcessor interface, you should immediately realize that its processing methods must do something to the Bean, such as generate a proxy

With a basic awareness, let’s look at the corresponding post-processing implementation here

@ Override public Object postProcessAfterInitialization (Object beans, String beanName) {/ / whether the current bean meet the previously defined point of tangency, If (isprompts (bean, beanName)) {prepareProxyFactory ProxyFactory = prepareProxyFactory(bean, beanName); if (! proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; }Copy the code

The BeanPostProcessor generates a proxy for the Bean that meets the pointcut, and the invoke() method of the notification is executed when the target method is called

This concludes the asynchronous implementation principle, which is quite simple. All we need to do is define pointcuts, notifications; To implement enhancements to the target method, the natural thing to think about is reverse proxies; Finally, how to change the original Bean? At this point, you need to contact the BeanPostProcessor interface related to the Bean life cycle

2.2 Thread Pool Usage

The thread pool part is still very important, and improper use can lead to unintended problems such as memory overflow, unlimited thread creation, business interaction, and so on

* <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named “taskExecutor” otherwise. Copy the code

According to the official documentation, it is known that will Spring from the context to get the only TaskExecutor or a name “TaskExecutor” Bean as a thread pool, the default thread pool in TaskExecutionAutoConfiguration automatic configuration class definition, The default thread pool configuration is as follows

 

As you can see, the default thread pool has a maximum queue size and maximum number of threads at Integer, which obviously leaves the system at risk. Therefore, we need to customize the thread pool for each asynchronous task and specify the name of the Bean corresponding to the thread pool on the @async () annotation

2.3 Exception Handling

By default, only log information is displayed for asynchronous task exception processing, and no additional processing is performed. Official documents also provide relevant information

Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged. Copy the code

SimpleAsyncUncaughtExceptionHandler asynchronous task of exception handling is the default implementation, if you want to custom exception handling, you just need to AsyncConfigurer interface

2.4 Return value type

First, let’s take a look at the official instructions for return value types

* <p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps. Copy the code

From the official instructions, you can see that only four types of return value are supported:

  • void
  • Future
  • ListenableFuture
  • CompletableFuture

Take a look at the specific source code

 

Asynchronous tasks support only four return types, so you don’t have to ask the silly question why a String return type returns NULL.