1. Introduction
Before reading this article, you may have experienced using the @async annotation for asynchronous processing of tasks. During project development, asynchronous processing is often performed for non-main flow, non-real-time and time-consuming tasks, which will not affect the main flow, but also improve the response time of the main flow.
When using the @async annotation for asynchronous processing, you may have stumbled on a number of problems, such as tasks not executing asynchronously, tasks interacting with each other due to shared thread pools, and asynchronous tasks not knowing how to handle exceptions. Today I’m going to take you to see what it really is, so that the next time you have a problem, you don’t have to panic.
2. The quest
2.1 Implementation Principles
2.1.1 Looking for asynchronous annotation postprocessor
You should know that in order to use the @async annotation to perform asynchronous tasks in your project, you need to manually EnableAsync by adding @enableasync
@SpringBootApplication
@EnableAsync
public class SpringBootAsyncApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootAsyncApplication.class, args);
}
}
Copy the code
Since the @enableasync annotation can be used to EnableAsync functionality, this annotation is our entry point
Enter the @enableAsync annotation, and you’ll see another familiar annotation, @import, which introduces a configuration class for that function into the program
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {}
Copy the code
Is the introduction of open AsyncConfigurationSelector point, you can see the ProxyAsyncConfiguration configuration class
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
Copy the code
Go to the ProxyAsyncConfiguration configuration class
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor(a) {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if(customAsyncAnnotation ! = AnnotationUtils.getDefaultValue(EnableAsync.class,"annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
Copy the code
Can see ProxyAsyncConfiguration configuration AsyncAnnotationBeanPostProcessor declared in a class a Bean, from literal meaning can guess the Bean should be the protagonist of asynchronous processing, then to see what kind of work does the protagonist did
Into AsyncAnnotationBeanPostProcessor, you can see the class implements the BeanFactoryAware BeanPostProcessor, these two interfaces, which is closely linked with the Bean lifecycle According to the Bean life cycle characteristics, we can know that the implementation method of BeanFactoryAware interface is implemented before that of BeanPostProcessor interface.
2.1.2 BeanFactoryAware
implementation
2.1.2.1 Define the section
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// Define the section
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType ! =null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
Copy the code
The setBeanFactory() implementation defines the aspect object. When you see the word “aspect”, two concepts immediately come to mind: pointcut and notification
- Cut point: Used to declare the target to cut into
- Notification: The appropriate treatment for the entry target
2.1.3 Define the pointcut
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
Copy the code
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// Define class pointcuts with @async, @asynchronous annotations
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
// Define pointcuts on methods annotated with @async, @asynchronous annotations
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return(result ! =null ? result : Pointcut.TRUE);
}
Copy the code
2.1.4 Defining notifications
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// Define the notification
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
Copy the code
Notification is the ultimate implementation, and it’s an important part of the implementation. Since it’s important, we need to look at the implementation
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<? > targetClass = (invocation.getThis() ! =null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// Get the asynchronous task thread pool
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// Define a Callable object
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return((Future<? >) result).get();
}
}
.
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
Copy the code
protected Object doSubmit(Callable {
// The return value type of the asynchronous task is CompletableFuture
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// The return value type of the asynchronous task is ListenableFuture
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// The return value type of the asynchronous task is Future
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
// The thread pool returns no value
else {
executor.submit(task);
return null;
}
}
Copy the code
The specific implementation of notification is as follows:
- The first step is to get the asynchronous task thread pool to execute the asynchronous task
- Wrap the target method with Callable
- Perform asynchronous asynchronous tasks and perform corresponding processing according to different return value types
Notifications give you an idea of how asynchronous tasks ultimately work, and you might wonder, how do you tell notifications to perform asynchronous tasks?
If you remember the BeanPostProcessor interface mentioned above, let’s take a look at its implementation
2.1.3 BeanPostProcessor
implementation
Speaking of the BeanPostProcessor interface, you should immediately realize that its processing methods must do something to the Bean, such as generate a proxy
With a basic awareness, let’s look at the corresponding post-processing implementation here
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// Determine whether the current Bean meets the previously defined pointcut, and if so, generate a proxy object
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if(! proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
// No proxy needed.
return bean;
}
Copy the code
The BeanPostProcessor generates a proxy for the Bean that meets the pointcut, and the invoke() method of the notification is executed when the target method is called
This concludes the asynchronous implementation principle, which is quite simple. All we need to do is define pointcuts, notifications; Dynamic proxies come naturally to mind when enhancements to target methods are implemented; Finally, how to change the original Bean? At this point, you need to contact the BeanPostProcessor interface related to the Bean life cycle
2.2 Thread Pool Usage
The thread pool part is still very important, and improper use can lead to unintended problems such as memory overflow, unlimited thread creation, business interaction, and so on
* <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. Copy the code
According to the official documentation, it is known that will Spring from the context to get the only TaskExecutor or a name “TaskExecutor” Bean as a thread pool, the default thread pool in TaskExecutionAutoConfiguration automatic configuration class definition, The default thread pool configuration is as follows
As you can see, the default thread pool has a maximum queue size and maximum number of threads at Integer, which obviously leaves the system at risk. Therefore, we need to customize the thread pool for each asynchronous task and specify the name of the Bean corresponding to the thread pool on the @async () annotation
2.3 Exception Handling
By default, only log information is displayed for asynchronous task exception processing, and no additional processing is performed. Official documents also provide relevant information
Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged. Copy the code
SimpleAsyncUncaughtExceptionHandler asynchronous task of exception handling is the default implementation, if you want to custom exception handling, you just need to AsyncConfigurer interface
2.4 Return value type
First, let’s take a look at the official instructions for return value types
* <p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps. Copy the code
From the official instructions, you can see that only four types of return value are supported:
- void
- Future
- ListenableFuture
- CompletableFuture
Look at the specific source code
Asynchronous tasks support only four return types, according to both the official description and source code analysis. No more silly questions about why a String return type returns NULL