A preface.

Hello, everyone.

Recently in combing DDD series of knowledge points, the group is also doing DDD transformation migration.

DDD series blog

  1. A penny will take you to DDD
  2. Event-driven model for DDD implementation
  3. DDD landed storage
  4. The architecture layer of DDD landing

While sorting through old business logic code, MANY interface implementations are marked with @async annotations. I use this annotation very little myself, and I’m used to customizing the ThreadPoolExecutor utility class for asynchronous logic. Let’s take a look at the code structure and see what the @async annotation is really doing.

My favorite thread pool utility class: I used this thread pool to capture the hearts of back-end girls

This article will give you a step-by-step interpretation of the source code from the @async annotation level, analyze various tramping practices, and expand how sleUTH link tracing and thread variables can be used fancy.

Unless otherwise specified, the springboot version is 2.3.10.release

Two. Try to use

In Spring, methods marked with @async annotations are called asynchronous methods. When these methods are executed, they are executed in a separate thread, and the caller can proceed without waiting for it to complete, which is the asynchronous invocation provided by Spring by default.

2.1. Mode of use

The way to do asynchronous transformation using @async is particularly simple.

  1. inStart the classorConfiguration classes that can be scanned by startup classesOn the label@EnableAsync
  2. inBeans managed by SpringOn the wayMark @ Async ()
  3. A method is calledwithCalled methodNot in the same beanIn the.

Take a close look at the above three restrictions. If any one of them is not met, @async will not work.

2.2. The simplest demo

Start class definition

@SpringBootApplication @EnableAsync public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); }}Copy the code

Controller layer method definition

@RestController @RequestMapping public class TestController { @Autowired TestService testService; @GetMapping() public void test(){ for (int i = 0; i <5 ; i++) { testService.testAsync(); }}}Copy the code

Service method definition

@Service @Slf4j public class TestServiceImpl implements TestService { @Override @Async public void testAsync(){ The info (" hee hee "); }}Copy the code

Log output

19:39:54-2021-09-15. 300, [HTTP - nio - 8088 - exec - 5), Com. Examp. Controller. TestController - hey - 2021-09-15 19:36:54. 302, [task - 5), Com. Examp. Service. Impl. TestServiceImpl - hee hee - 2021-09-15 19:36:54. 302, [task - 4), Com. Examp. Service. Impl. TestServiceImpl - hee hee - 2021-09-15 19:36:54. 302, [] task - 1, Com. Examp. Service. Impl. TestServiceImpl - hee hee - 2021-09-15 19:36:54. 302, [task - 2), Com. Examp. Service. Impl. TestServiceImpl - hee hee - 2021-09-15 19:36:54. 302, [task - 3), Com. Examp. Service. Impl. TestServiceImpl - hee heeCopy the code

From log printing, you can see that the Controller method prints logs on a different thread than the Service layer method.

It’s really simple to use!

2.3. Step on holes and ask questions

As you can see from the previous two points, using @async to make asynchrony is really simple, but there are all kinds of holes in it.

Ask questions first, you can think about:

  1. Why doesn’t Ali recommend using @async directly
  2. Whether the @async annotation method is transactionally consistent
  3. In the same class A->B, B method marked with @async, in order to call the success of the class, the current class can be injected into the success of asynchronous call
  4. Can @async annotated methods read ThreadLocal variables
  5. Whether the @async annotated method gets the return value
  6. The traceId of the sluETH link can be traced to the thread pool

Source code analysis

Nonsense not to say, look at the source.

3.1. @ Async

/** * This annotation can be used to mark a method that executes asynchronously. It can also be used to mark a class to indicate that all methods in the class are executed asynchronously. (ListenableFuture interface /CompletableFuture class) * Future is an actual asynchronous return returned by the agent, which tracks the return value of an asynchronous method. You can also use the AsyncResult class (which implements the * ListenableFuture interface) (either Spring or EJB) or the CompletableFuture class */ @target ({elementType.type, METHOD}) @Retention(RetentionPolicy.runtime) @documented Public @interface Async {// String value() default ""; String value() default ""; }Copy the code

3.2. @ EnableAsync

/** * To enable the Spring asynchronous executor, like the TASK tag Configuration in XML, you need to use the @configuration annotation, as in the beginning of this article, the annotation must * be marked on the startup class or Configuration class that can be scanned by the startup class. * * By default spring searches for beans of type TaskExecutor or Executor beans with the name TaskExecutor. There is no SimpleAsyncTaskExecutor Executor Executor The AsyncConfigurer interface can be copied getAsyncExecutor to get the asynchronous executor. GetAsyncUncaughtExceptionHandler won * take asynchronous uncaught exception handler * / @ Target (ElementType. TYPE) @ Retention (RetentionPolicy. RUNTIME) @ Documented @ Import (AsyncConfigurationSelector. Class) public @ interface EnableAsync {/ / this property is used to support user-defined asynchronous annotations, Default scan spring's @async and EJB3.1's @code @javax.ejb.asynchronous Class<? extends Annotation> annotation() default Annotation.class; // Indicate whether the CGLIB subclass PROXY needs to be created, only when AdviceMode=PROXY. Note that when set to true, other spring-managed beans are also upgraded to CGLIB subclass agent Boolean proxyTargetClass() default false; ASPECTJ AdviceMode mode() default advicemode.proxy; ASPECTJ AdviceMode mode() default AdviceMode. Int order() default order.lowest_precedence; // Specify the order in which the asynchronous annotation bean processor should execute. }Copy the code

3.3. AsyncConfigurationSelector

The @enableAsync annotation is an obvious start annotation, and almost all spring start configuration class annotations start with @enable. The method inside is also a familiar friend @import

For the @import annotation, see this article: The past and present life of @SpringBootApplication

// query: Based on the pattern AdviceMode defined in @EanableAsync added to the @Configuration tag class, Determine the realization of the abstract asynchronous configuration classes public class AsyncConfigurationSelector extends AdviceModeImportSelector < EnableAsync > {private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; @override @nullable public String[] selectImports(AdviceMode AdviceMode) {switch (adviceMode) {case PROXY: / / JDK interface agent return new String [] {ProxyAsyncConfiguration. Class. GetName ()}; Case ASPECTJ:// Cglib proxy return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; }}}Copy the code

Class is relatively simple, just decide what proxy mode to use

3.4. ProxyAsyncConfiguration

Take a look at the JDK interface proxy as an example

@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @ Role (BeanDefinition ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor () {/ / whether annotations metadata information Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); / / create a new asynchronous annotation bean post-processor AsyncAnnotationBeanPostProcessor BPP = new AsyncAnnotationBeanPostProcessor (); Bpp. configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation ! = AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation);  } / / set whether to upgrade to the additional subclasses agent, default. Do not open the BPP setProxyTargetClass (this) enableAsync) getBoolean (" proxyTargetClass ")); Bpp.setorder (this.enableAsync.<Integer>getNumber("order"))); return bpp; }}Copy the code

Configuration class operation is more clear, is integration AbstractAsyncConfiguration abstract class, a new rear AsyncAnnotationBeanPostProcessor processor and to initialize parameters.

3.5. AbstractAsyncConfiguration

@ Configuration public abstract class AbstractAsyncConfiguration implements ImportAware {/ / @ Nullable enableAsync annotation attribute protected AnnotationAttributes enableAsync; @nullable protected Supplier<Executor> Executor; / / exception actuator @ Nullable protected: Supplier < AsyncUncaughtExceptionHandler > exceptionHandler; @override public void setImportMetadata(AnnotationMetadata importMetadata) {this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); Autowired(Required = false) void setConfigurers(Collection<AsyncConfigurer> Configurers) {if  (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; }}Copy the code

3.6. AsyncAnnotationBeanPostProcessor

As you can see from the dependency diagram, this is the post-processor of a bean

Public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {/ / omit part of the code @override public void setBeanFactory(BeanFactory) {super.setBeanFactory(BeanFactory); // Initialize the section AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.ExceptionHandler); if (this.asyncAnnotationType ! = null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }}Copy the code

3.7. AsyncAnnotationAdvisor

Wuhu, discovered, this is an asynchronous annotation aspect, defines the @ Async section of the parse, specific treatment in AbstractAdvisingBeanPostProcessor. PostProcessAfterInitialization method

3.8.Interceptor implementation in section

The invoke method of the Interceptor interface

Breakpoint at the interface methods, step by step, find AsyncExecutionInterceptor class

@Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class<? > targetClass = (invocation.getThis() ! = null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } // Define task Callable<Object> task = () -> {try {Object result = invocation. Proceed (); If (result instanceof Future) {return ((Future<? >) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; Return doSubmit(Task, executor, Invocation.getMethod().getreturnType ()); }Copy the code

Look at the final core method, doSubmit

@Nullable protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<? > returnType) {/ / the return value is CompletableFuture processing the if (CompletableFuture. Class. IsAssignableFrom (returnType)) {return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } / / the return value is ListenableFuture processing else if (ListenableFuture. Class. IsAssignableFrom (returnType)) {return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } / / the return value is ListenableFuture processing else if (Future) class) isAssignableFrom (returnType)) {return executor. Submit (task); } else {// Breakpoint executor.submit(task);} else {// breakpoint executor.submit(task); return null; }}Copy the code

Execute the demo method again, with the break point on the line above

The discovery task is submitted to a task executor called applicationTaskExecutor

Thread pool parameters:

Number of core threads: 8

The queue size and maximum thread pool are both interger.max.

3.9 summary

The class that initializes the annotation @async

Parsing @async, cut point weaving into execution

Iv. Review of problems

4.1. Why does Ali not recommend using @async directly

As you can see at the end of 2.8, the maximum number of threads and queues configured for @async by default is 2147483647.

In other words, if the @async method is marked and the concurrency of an event suddenly becomes high, the system load will suddenly become high, exaggerated directly down the machine.

So the @async annotation can’t be used?

The answer is definitely no.

As stated in the class annotation of the @enableAsync annotation, beans that allow custom thread pools replace the system default thread pool.

Search for applicationTaskExecutor, find TaskExecutionAutoConfiguration

@ConditionalOnClass(ThreadPoolTaskExecutor.class) @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(TaskExecutionProperties.class) public class TaskExecutionAutoConfiguration { /** * Bean name of the application {@link TaskExecutor}. */ public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { TaskExecutionProperties.Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator); builder = builder.taskDecorator(taskDecorator.getIfUnique()); return builder; } // A bean named applicationTaskExecutor is loaded by default when no Executor bean exists in the system. The bean here is the actual section execution, @lazy @bean (name = {APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }}Copy the code

So if you want to use @async, be sure to add itCustom thread poolsThe bean returned

For example,

@configuration public class ThreadPoolTaskConfig {private static final int CORE_POOL_SIZE = 5; Private static final int MAX_POOL_SIZE = 10; Private static final int KEEP_ALIVE_TIME = 10; private static final int KEEP_ALIVE_TIME = 10; /** Queue size */ private static final int QUEUE_CAPACITY = 200; Private static final String THREAD_NAME_PREFIX = "async-service -"; @Bean public ThreadPoolTaskExecutor taskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(CORE_POOL_SIZE); executor.setMaxPoolSize(MAX_POOL_SIZE); executor.setQueueCapacity(KEEP_ALIVE_TIME); executor.setKeepAliveSeconds(QUEUE_CAPACITY); executor.setThreadNamePrefix(THREAD_NAME_PREFIX); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; }}Copy the code

Note also that when you are running spring versions prior to 2.1, there is no applicationTaskExecutor thread pool configured by default, but SimpleAsyncTaskExecutor, His submit method is constantly creating new threads to execute tasks, and it also consumes resources, so it is not recommended to use it directly.

Whether the methods annotated at 4.2.@Async are transactional

From the analysis, we know that the class in which the @async annotated method resides is proxied and executed using thread pool threads.

For example, now call chain A ->B ->C, B is labeled @async

ABC cannot be transaction consistent

BC cannot be transactile consistent. If you want to make BC transaction consistent, you need to package the internal logic of B method again, so that the BC method call logic inside a method

@async public void test(){test()1; @async@transactional (rollbackFor = exception.class) public void test(){test()2; test()3; } @Transactional(rollbackFor = Exception.class) public void test1(){ test2(); test3(); } public void test2(){ } public void test3(){ }Copy the code

4.3. In the same class A->B, the B method is marked with @async. In order to call successfully, inject the current class method into the class to see if the asynchronous call can succeed

Take a look at the test code

@Service @Slf4j public class UserServiceImpl implements UserService { @Autowired UserService userService; @Override public void testAsync(){ userService.testAsync1(); } @override @async () public void testAsync1(){log.info(" hee hee "); }}Copy the code

We know that Spring helps us with loop dependencies by default

Cyclic dependencies: inject class B attributes into class A, and class A into class B attributes; Class A injections themselves are referred to as circular dependencies. The three-level caching used by Spring by default solves this situation for us. If you are interested in the three-level cache, you can read the online analysis on Baidu, and I will spare some time to write an analysis article later.

Let’s start it up and find that it failed to start. The error is as follows

Caused by: org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'userServiceImpl': Bean with name 'userServiceImpl' has been injected into other beans [userServiceImpl] in its raw version as part of a circular reference, but has eventually been wrapped. This means that said other beans do not use the final version of the bean. This is often the result of over-eager type matching - consider using 'getBeanNamesForType' with the 'allowEagerInit' flag turned off, for example.
Copy the code

This means that a cyclic dependency has occurred, falling into an infinite loop, and the startup failed!

Hey, didn’t it say that Spring solved loop dependencies? Why does the @async annotation fail startup?

Considering that some readers are not familiar with the three-level cache, here is a direct text version of the conclusion, interested in the above source can be combined with the debug analysis.

Conclusion:

The agent is validated when the UserService is placed in the level 2 cache, but the agent is generated to determine whether the corresponding post-processor type is: SmartInstantiationAwareBeanPostProcessor, above 2.6 as you can see, @ the post processor of Async analytic AsyncAnnotationBeanPostProcessor merely BeanPostProcessor type, So the early constructors exposed the original object. So the UserService is associated with itself as the original object. But behind the rear AsyncAnnotationBeanPostProcessor processors will be packed UserService when processing has become a proxy objects. Therefore, the final validation logic finds that the internal properties of the UserService class depend on the actual UserService bean, which is not equal to the proxy object. Enter the lower self – check program, and finally report an error.


The call cannot even be started.

Therefore, in order for the asynchronous invocation to succeed, it is necessary to restrict the calling methods to be methods of two different beans, that is, methods annotated with @Async must be able to be proxied by the aspect class.

4.4.@Async annotated methods can read ThreadLocal variables

If there is any thread variable that has to be transmitted from the main thread to the thread in the thread pool, you need to use the ali open source thread variable called TransmittableThreadLocal.

Threadlocal’s past and present lives are revealed in one article

The method annotated 4.5.@Async can get the return value

As you can see from the 2.8 Invoke method, the direct return value of the method cannot be retrieved and will be null, which can be retrieved via the Future class.

4.6. Can the traceId of the Slueth link be traced to the thread pool

can

The introduction of pom

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> < version > 2.2.5. RELEASE < / version > < / dependency >Copy the code

Performed again

- the 2021-09-15 23:13:13. 502, INFO, [f6570e5ad87cebe 7, 7 f6570e5ad87cebe,], [HTTP - nio - 8088 - exec - 2]. Com. Examp. Controller. TestController - hey - 2021-09-15 23:13:13. 517, the INFO, [7 f6570e5ad87cebe, 6 c4ef07be0f53434, 7 f6570e5ad87cebe], [Async - Service - 2), Com. Examp. Service. Impl. TestServiceImpl - hee hee - 2021-09-15 23:13:13. 517, the INFO, [7f6570e5ad87cebe,8ff5d5ce00c93add,7f6570e5ad87cebe], [Async-Service-1], Com. Examp. Service. Impl. TestServiceImpl - hee hee - 2021-09-15 23:13:13. 517, the INFO, [7f6570e5ad87cebe,99b553b90241b79e,7f6570e5ad87cebe], [Async-Service-3], Com. Examp. Service. Impl. TestServiceImpl - hee hee - 2021-09-15 23:13:13. 517, the INFO, [7f6570e5ad87cebe,42c54ecd36df454c,7f6570e5ad87cebe], [Async-Service-4], Com. Examp. Service. Impl. TestServiceImpl - hee hee - 2021-09-15 23:13:13. 517, the INFO, [7 f6570e5ad87cebe, 176 b3679aa72038e, 7 f6570e5ad87cebe], [Async - Service - 5], com. Examp. Service. The impl. TestServiceImpl - hee heeCopy the code

Go back to the 2.8 method breakpoint

Type into LazyTraceThreadPoolTaskExecutor, is a subclass of ThreadPoolTaskExecutor

See LazyTraceThreadPoolTaskExecutor call chain that agent in the rear ExecutorBeanPostProcessor processor system ThreadPoolTaskExecutor types of beans. Perform a task that is to say, the essential sense or your custom ThreadPoolTaskExecutor, just made link tracking ExecutorBeanPostProcessor enhancement.

In live.

This paper starts with the simple use of @async, analyzes the source code of @async, and finally summarizes and analyzes the possible problems in the process of using @async. Of course, @async is not a good idea to put the entire system’s asynchronous tasks into a single thread pool. A large number of asynchronous tasks in different businesses try to separate thread pool processing.

6. Reference

Asynchronous task Spring@async annotation source parsing

Seven. Contact me

If there is an incorrect place in the article, welcome to correct, writing the article is not easy, point a thumbs-up, yao yao da ~

Nailing: louyanfeng25

WeChat: baiyan_lou

Public account: Uncle Baiyan