1. Introduction

Before reading this article, you may have experienced using the @async annotation for asynchronous processing of tasks. During project development, asynchronous processing is often performed for non-main flow, non-real-time and time-consuming tasks, which will not affect the main flow, but also improve the response time of the main flow.

When using the @async annotation for asynchronous processing, you may have stumbled on a number of problems, such as tasks not executing asynchronously, tasks interacting with each other due to shared thread pools, and asynchronous tasks not knowing how to handle exceptions. Today I’m going to take you to see what it really is, so that the next time you have a problem, you don’t have to panic.

2. The quest

2.1 Implementation Principles

2.1.1 Looking for asynchronous annotation postprocessor

You should know that in order to use the @async annotation to perform asynchronous tasks in your project, you need to manually EnableAsync by adding @enableasync

@SpringBootApplication

@EnableAsync

public class SpringBootAsyncApplication {



    public static void main(String[] args) {

        SpringApplication.run(SpringBootAsyncApplication.class, args);

    }

}

Copy the code

Since the @enableasync annotation can be used to EnableAsync functionality, this annotation is our entry point

Enter the @enableAsync annotation, and you’ll see another familiar annotation, @import, which introduces a configuration class for that function into the program

@Import(AsyncConfigurationSelector.class)

public @interface EnableAsync {}

Copy the code

Is the introduction of open AsyncConfigurationSelector point, you can see the ProxyAsyncConfiguration configuration class

public String[] selectImports(AdviceMode adviceMode) {

    switch (adviceMode) {

        case PROXY:

            return new String[] {ProxyAsyncConfiguration.class.getName()};

        case ASPECTJ:

            return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};

        default:

            return null;

    }

}

Copy the code

Go to the ProxyAsyncConfiguration configuration class

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)

@Role(BeanDefinition.ROLE_INFRASTRUCTURE)

public AsyncAnnotationBeanPostProcessor asyncAdvisor(a) {

    Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");

    AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();

    bpp.configure(this.executor, this.exceptionHandler);

    Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");

    if(customAsyncAnnotation ! = AnnotationUtils.getDefaultValue(EnableAsync.class,"annotation")) {

        bpp.setAsyncAnnotationType(customAsyncAnnotation);

    }

    bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));

    bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));

    return bpp;

}

Copy the code

Can see ProxyAsyncConfiguration configuration AsyncAnnotationBeanPostProcessor declared in a class a Bean, from literal meaning can guess the Bean should be the protagonist of asynchronous processing, then to see what kind of work does the protagonist did

Into AsyncAnnotationBeanPostProcessor, you can see the class implements the BeanFactoryAware BeanPostProcessor, these two interfaces, which is closely linked with the Bean lifecycle According to the Bean life cycle characteristics, we can know that the implementation method of BeanFactoryAware interface is implemented before that of BeanPostProcessor interface.

2.1.2 BeanFactoryAwareimplementation

2.1.2.1 Define the section
@Override

public void setBeanFactory(BeanFactory beanFactory) {

    super.setBeanFactory(beanFactory);



    // Define the section

    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);

    if (this.asyncAnnotationType ! =null) {

        advisor.setAsyncAnnotationType(this.asyncAnnotationType);

    }

    advisor.setBeanFactory(beanFactory);

    this.advisor = advisor;

}

Copy the code

The setBeanFactory() implementation defines the aspect object. When you see the word “aspect”, two concepts immediately come to mind: pointcut and notification

  • Cut point: Used to declare the target to cut into
  • Notification: The appropriate treatment for the entry target
2.1.3 Define the pointcut
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);

asyncAnnotationTypes.add(Async.class);

try {

    asyncAnnotationTypes.add((Class<? extends Annotation>)

                             ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));

}

catch (ClassNotFoundException ex) {

    // If EJB 3.1 API not present, simply ignore.

}

Copy the code
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {

    ComposablePointcut result = null;

    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {

        // Define class pointcuts with @async, @asynchronous annotations

        Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);

        // Define pointcuts on methods annotated with @async, @asynchronous annotations

        Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);

        if (result == null) {

            result = new ComposablePointcut(cpc);

        }

        else {

            result.union(cpc);

        }

        result = result.union(mpc);

    }

    return(result ! =null ? result : Pointcut.TRUE);

}

Copy the code
2.1.4 Defining notifications
protected Advice buildAdvice(

   @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler)
 
{

 // Define the notification

    AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);

    interceptor.configure(executor, exceptionHandler);

    return interceptor;

}

Copy the code

Notification is the ultimate implementation, and it’s an important part of the implementation. Since it’s important, we need to look at the implementation

public Object invoke(final MethodInvocation invocation) throws Throwable {

Class<? > targetClass = (invocation.getThis() ! =null ? AopUtils.getTargetClass(invocation.getThis()) : null);

    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);

    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);



    // Get the asynchronous task thread pool

    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);

    if (executor == null) {

        throw new IllegalStateException(

            "No executor specified and no default executor set on AsyncExecutionInterceptor either");

    }



    // Define a Callable object

    Callable<Object> task = () -> {

        try {

            Object result = invocation.proceed();

            if (result instanceof Future) {

                return((Future<? >) result).get();

            }

        }

.

        return null;

    };



    return doSubmit(task, executor, invocation.getMethod().getReturnType());

}

Copy the code
protected Object doSubmit(Callable task, AsyncTaskExecutor executor, Class
          returnType) {

    // The return value type of the asynchronous task is CompletableFuture

    if (CompletableFuture.class.isAssignableFrom(returnType)) {

        return CompletableFuture.supplyAsync(() -> {

            try {

                return task.call();

            }

            catch (Throwable ex) {

                throw new CompletionException(ex);

            }

        }, executor);

    }

 // The return value type of the asynchronous task is ListenableFuture

    else if (ListenableFuture.class.isAssignableFrom(returnType)) {

        return ((AsyncListenableTaskExecutor) executor).submitListenable(task);

    }

    // The return value type of the asynchronous task is Future

    else if (Future.class.isAssignableFrom(returnType)) {

        return executor.submit(task);

    }

    // The thread pool returns no value

    else {

        executor.submit(task);

        return null;

    }

}

Copy the code

The specific implementation of notification is as follows:

  • The first step is to get the asynchronous task thread pool to execute the asynchronous task
  • Wrap the target method with Callable
  • Perform asynchronous asynchronous tasks and perform corresponding processing according to different return value types

Notifications give you an idea of how asynchronous tasks ultimately work, and you might wonder, how do you tell notifications to perform asynchronous tasks?

If you remember the BeanPostProcessor interface mentioned above, let’s take a look at its implementation

2.1.3 BeanPostProcessorimplementation

Speaking of the BeanPostProcessor interface, you should immediately realize that its processing methods must do something to the Bean, such as generate a proxy

With a basic awareness, let’s look at the corresponding post-processing implementation here

@Override

public Object postProcessAfterInitialization(Object bean, String beanName) {

    // Determine whether the current Bean meets the previously defined pointcut, and if so, generate a proxy object

    if (isEligible(bean, beanName)) {

        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);

        if(! proxyFactory.isProxyTargetClass()) {

            evaluateProxyInterfaces(bean.getClass(), proxyFactory);

        }

        proxyFactory.addAdvisor(this.advisor);

        customizeProxyFactory(proxyFactory);

        return proxyFactory.getProxy(getProxyClassLoader());

    }



    // No proxy needed.

    return bean;

}

Copy the code

The BeanPostProcessor generates a proxy for the Bean that meets the pointcut, and the invoke() method of the notification is executed when the target method is called

This concludes the asynchronous implementation principle, which is quite simple. All we need to do is define pointcuts, notifications; Dynamic proxies come naturally to mind when enhancements to target methods are implemented; Finally, how to change the original Bean? At this point, you need to contact the BeanPostProcessor interface related to the Bean life cycle

2.2 Thread Pool Usage

The thread pool part is still very important, and improper use can lead to unintended problems such as memory overflow, unlimited thread creation, business interaction, and so on

* <p>By default, Spring will be searching for an associated thread pool definition:
* either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
* or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. 
Copy the code

According to the official documentation, it is known that will Spring from the context to get the only TaskExecutor or a name “TaskExecutor” Bean as a thread pool, the default thread pool in TaskExecutionAutoConfiguration automatic configuration class definition, The default thread pool configuration is as follows


As you can see, the default thread pool has a maximum queue size and maximum number of threads at Integer, which obviously leaves the system at risk. Therefore, we need to customize the thread pool for each asynchronous task and specify the name of the Bean corresponding to the thread pool on the @async () annotation

2.3 Exception Handling

By default, only log information is displayed for asynchronous task exception processing, and no additional processing is performed. Official documents also provide relevant information

Besides, annotated methods having a
* {@code void} return type cannot transmit any exception back to the caller. By default,
* such uncaught exceptions are only logged.
Copy the code

SimpleAsyncUncaughtExceptionHandler asynchronous task of exception handling is the default implementation, if you want to custom exception handling, you just need to AsyncConfigurer interface

2.4 Return value type

First, let’s take a look at the official instructions for return value types

* <p>In terms of target method signatures, any parameter types are supported.
* However, the return type is constrained to either {@code void} or
* {@link java.util.concurrent.Future}. In the latter case, you may declare the
* more specific {@link org.springframework.util.concurrent.ListenableFuture} or
* {@link java.util.concurrent.CompletableFuture} types which allow for richer
* interaction with the asynchronous task and for immediate composition with
* further processing steps.
Copy the code

From the official instructions, you can see that only four types of return value are supported:

  • void
  • Future
  • ListenableFuture
  • CompletableFuture

Look at the specific source code


Asynchronous tasks support only four return types, according to both the official description and source code analysis. No more silly questions about why a String return type returns NULL