Synchronous & asynchronous

  • Synchronization event

    In a thread, all business methods are executed sequentially, with a dependency between the top and the bottom, where one link takes too long or blocks, affecting subsequent links and the overall increase in time.

  • Asynchronous events

    In one thread, one business method or logic is executed, and other business methods or logic are executed in parallel through asynchronous threads, independent of each other, which can make full use of the advantages of multi-threading to improve concurrency and reduce overall time consumption.

Realize the principle of

The interaction process

  • Publisher Event publisher, here is the publishing entry for the event object
  • Listener Event listener, which is the final object handled by the event object
  • Event The event object, the carrier of event data
  • Multicast applicationEventMultiCaster events, it is a bridge connecting the publishing and listener and transit route, responsible for distribute the event object to a specific listener

The event mechanism in Spring is implemented through the observer mode, which supports both synchronous and asynchronous modes.

The event mechanism of Spring provides an elegant programming method that can achieve business decoupling. The implementation is stripped off, making the implementation details more specific and focused, which is convenient for subsequent maintenance to a certain extent and has good scalability.

In general, we do this asynchronously, otherwise it’s not much different from everyday synchronous calls and doesn’t take full advantage of the parallelism that asynchronous threads bring.

The characteristics of role
Observer model The decoupling
asynchronous parallelization

Initialize the

As for the initialization of Spring event mechanism, in fact, it is mainly based on the observer mode to bind and establish the routing relationship between publisher and listener. Event multicast also relies on the mapping relationship to distribute event objects and process specific listeners.

The Spring event mechanism is only a part of the whole Spring environment. It is not difficult to understand the initialization mode and working principle of the Spring environment before analyzing the initialization of the event mechanism. We know AbstractApplicationContext is environment of the abstract base class in Spring, it’s the refresh () method covers all the Spring initialization process, It contains initialization method of multicast initApplicationEventMulticaster () and listener binding method registerListeners ().

 public void refresh(a) throws BeansException, IllegalStateException {
        / /...
	    // omit others
        synchronized(this.startupShutdownMonitor) {
            / /...
            // omit others
            try {
                / /...
                // omit others
                this.initApplicationEventMulticaster();
                this.registerListeners();
                // omit others
                / /...
            } catch (BeansException var9) {
                / /...
                // omit others
            } finally {
                / /...
                // omit others}}}Copy the code

See below initialization method of multicast initApplicationEventMulticaster (), mainly from the BeanFactory to judge whether the created multicast, If does not create to create the default SimpleApplicationEventMulticaster as multicast.

protected void initApplicationEventMulticaster(a) {
		// Get the current BeanFactory to get the multicast Bean instance from the Bean factory
        ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
        // If there is a multicast, get it directly
        if (beanFactory.containsLocalBean("applicationEventMulticaster")) {
            this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); }}/ / if there is no multicast, the Bean directly to create the default SimpleApplicationEventMulticaster multicast (CCM) registration
		else {
            this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
            beanFactory.registerSingleton("applicationEventMulticaster".this.applicationEventMulticaster);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Unable to locate ApplicationEventMulticaster with name 'applicationEventMulticaster': using default [" + this.applicationEventMulticaster + "]"); }}}Copy the code

Methods of registering listeners

protected void registerListeners(a) {
		// Get iterators for all applicationListeners
        Iterator var1 = this.getApplicationListeners().iterator();
		// Register all listeners with the current multicast
        while(var1.hasNext()) { ApplicationListener<? > listener = (ApplicationListener)var1.next();this.getApplicationEventMulticaster().addApplicationListener(listener);
        }

        String[] listenerBeanNames = this.getBeanNamesForType(ApplicationListener.class, true.false);
        String[] var7 = listenerBeanNames;
        int var3 = listenerBeanNames.length;

        for(int var4 = 0; var4 < var3; ++var4) {
            String listenerBeanName = var7[var4];
            this.getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
        }
		// If it is an early event, execute the publish event object directly
        Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
        this.earlyApplicationEvents = null;
        if(earlyEventsToProcess ! =null) {
            Iterator var9 = earlyEventsToProcess.iterator();

            while(var9.hasNext()) {
                ApplicationEvent earlyEvent = (ApplicationEvent)var9.next();
                this.getApplicationEventMulticaster().multicastEvent(earlyEvent); }}}Copy the code

Publish event

Publishing events through AbstractApplicationContext publishEvent released () method

public void publishEvent(Object event) {
	publishEvent(event, null);
}

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
	Assert.notNull(event, "Event must not be null");

	// Check whether the event type is ApplicationEvent. If not, encapsulate it as PayloadApplicationEvent
	ApplicationEvent applicationEvent;
	if (event instanceof ApplicationEvent) {
		applicationEvent = (ApplicationEvent) event;
	}
	else {
		applicationEvent = new PayloadApplicationEvent<>(this, event);
		if (eventType == null) {
			eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
		}
	}

	// If it is an early event of Spring initialization, it needs to be added to the early event and published immediately
	if (this.earlyApplicationEvents ! =null) {
		this.earlyApplicationEvents.add(applicationEvent);
	}
	else {
		// If it is not an early event, the event is published immediately via multicast
		getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
	}
	
	// Publish the event if the parent context exists
	if (this.parent ! =null) {
		if (this.parent instanceof AbstractApplicationContext) {
			((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
		}
		else {
			this.parent.publishEvent(event); }}}Copy the code

Then look at SimpleApplicationEventMulticaster multicastEvent () method, this method is multicast broadcast events

public void multicastEvent(ApplicationEvent event) {
	// Parse the Event type to publish the Event
	multicastEvent(event, resolveDefaultEventType(event));
}

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
	// Get the parse type of the eventResolvableType type = (eventType ! =null ? eventType : resolveDefaultEventType(event));
	// Get the current thread pool of the multicast
	Executor executor = getTaskExecutor();
	// Gets a collection of ApplicationListeners currently in the application that match the given event type. Listeners that do not match are excluded. Recycle execution
	for(ApplicationListener<? > listener : getApplicationListeners(event, type)) {// If the thread pool is not empty, it is executed asynchronously through the thread pool
		if(executor ! =null) {
			executor.execute(() -> invokeListener(listener, event));
		}
		else {
			// Otherwise, the current thread executesinvokeListener(listener, event); }}}Copy the code

The event processing

Event processing by invokeListener () method for processing, the method to do the try and catch a layer encapsulation, practical methods in doInvokeListener () method of the listener. OnApplicationEvent (event)

protected void invokeListener(ApplicationListener
        listener, ApplicationEvent event) {
	ErrorHandler errorHandler = getErrorHandler();
	if(errorHandler ! =null) {
		try {
			doInvokeListener(listener, event);
		}
		catch(Throwable err) { errorHandler.handleError(err); }}else{ doInvokeListener(listener, event); }}@SuppressWarnings({"rawtypes", "unchecked"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
	try {
		// The actual listener accepts the event and processes it
		listener.onApplicationEvent(event);
	}
	catch (ClassCastException ex) {
		String msg = ex.getMessage();
		if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
			Log logger = LogFactory.getLog(getClass());
			if (logger.isTraceEnabled()) {
				logger.trace("Non-matching event type for listener: "+ listener, ex); }}else {
			throwex; }}}Copy the code

Relationship between matching

After an event object is published, the listeners are retrieved by the getApplicationListeners method. The returned listeners are traversed and type resolved, and qualified listeners are found.

protectedCollection<ApplicationListener<? >> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) {// The object that originally sent the eventObject source = event.getSource(); Class<? > sourceType = (source ! =null ? source.getClass() : null);
	// Encapsulates the given event type with the source type
	ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
	// Quick check for existing entry on ConcurrentHashMap...
	ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
	if(retriever ! =null) {
		return retriever.getApplicationListeners();
	}

	if (this.beanClassLoader == null ||
			(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
					(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
		// Fully synchronize build and cache the ListenerRetriever
		synchronized (this.retrievalMutex) {
			retriever = this.retrieverCache.get(cacheKey);
			if(retriever ! =null) {
				return retriever.getApplicationListeners();
			}
			retriever = new ListenerRetriever(true);
			// Actually retrieves the application listeners of the given event and source type, returning the filtered collection of listenersCollection<ApplicationListener<? >> listeners = retrieveApplicationListeners(eventType, sourceType, retriever);this.retrieverCache.put(cacheKey, retriever);
			returnlisteners; }}else {
		// No ListenerRetriever caching -> no synchronization necessary
		return retrieveApplicationListeners(eventType, sourceType, null); }}Copy the code

Through retrieveApplicationListeners () method to actually retrieve a given event listener and source type application implementation. The supportsEvent() method is used to determine whether the listener supports a given event.

privateCollection<ApplicationListener<? >> retrieveApplicationListeners( ResolvableType eventType,@NullableClass<? > sourceType,@NullableListenerRetriever retriever) { List<ApplicationListener<? >> allListeners =new ArrayList<>();
	// A collection of listeners in the current applicationSet<ApplicationListener<? >> listeners;// The BeanName collection of listeners in the current application
	Set<String> listenerBeans;
	synchronized (this.retrievalMutex) {
		listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
		listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
	}
	for(ApplicationListener<? > listener : listeners) {// Determine whether the listener supports the given event
		if (supportsEvent(listener, eventType, sourceType)) {
			if(retriever ! =null) { retriever.applicationListeners.add(listener); } allListeners.add(listener); }}if(! listenerBeans.isEmpty()) { BeanFactory beanFactory = getBeanFactory();for (String listenerBeanName : listenerBeans) {
			try {
				// Get the type based on the listener's BeanNameClass<? > listenerType = beanFactory.getType(listenerBeanName);if (listenerType == null|| supportsEvent(listenerType, eventType)) { ApplicationListener<? > listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);// Determine whether the listener supports the given event
					if(! allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {if(retriever ! =null) {
							if (beanFactory.isSingleton(listenerBeanName)) {
								retriever.applicationListeners.add(listener);
							}
							else{ retriever.applicationListenerBeans.add(listenerBeanName); } } allListeners.add(listener); }}}catch (NoSuchBeanDefinitionException ex) {
				// Singleton listener instance (without backing bean definition) disappeared -
				// probably in the middle of the destruction phase
			}
		}
	}
	AnnotationAwareOrderComparator.sort(allListeners);
	if(retriever ! =null && retriever.applicationListenerBeans.isEmpty()) {
		retriever.applicationListeners.clear();
		retriever.applicationListeners.addAll(allListeners);
	}
	return allListeners;
}
Copy the code

Configuration mode

Open the asynchronous

  • Annotation way
/** * created by guanjian on 2021/4/7 9:36 */
@Configuration
@EnableAsync
public class AsyncConfig{
	// More options can be configured here, such as asynchronous thread pools, etc
}
Copy the code
  • XML way

      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/aop https://www.springframework.org/schema/aop/spring-aop.xsd">

    <! Async thread pool Async thread pool Async thread pool Async thread pool Async thread pool Async
    <task:annotation-driven />

</beans>
Copy the code

< Task :annotation-driven “/> enables asynchrony, which is equivalent to the @async annotation.

Multicast definition

<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
        <property name="taskExecutor" ref="executor" />
</bean>
Copy the code

If there is no definition can also, you will be in the initialization phase create SimpleApplicationEventMulticaster to the multicast device as the default.

Asynchronous thread definition

  • Annotation way
/** * created by guanjian on 2021/4/7 9:36 */
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    /** * the asynchronous thread pool is manually injected */
    @Override
    public Executor getAsyncExecutor(a) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // Number of core thread pools
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        // Maximum number of threads
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 5);
        // Queue capacity of the thread pool
        executor.setQueueCapacity(Runtime.getRuntime().availableProcessors() * 2);
        // Prefix of thread name
        executor.setThreadNamePrefix("async-executor-");
        executor.initialize();
        return executor;
    }

    /** * where asynchronous unknown error capture processing is injected */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(a) {
        return newSimpleAsyncUncaughtExceptionHandler(); }}Copy the code

Configuration, relative to the implementation of AsyncConfigurer interface to complete an asynchronous call getAsyncExecutor is to configure the asynchronous thread pool, getAsyncUncaughtExceptionHandler is to configure the asynchronous unknown error trapping process.

  • XML way

      
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/aop https://www.springframework.org/schema/aop/spring-aop.xsd">

    <! Enable @aspectJ AOP proxy
    <aop:aspectj-autoproxy proxy-target-class="true" expose-proxy="true"/>
    <! Async thread pool Async thread pool Async thread pool Async thread pool Async thread pool Async
    <task:annotation-driven executor="executor" proxy-target-class="true"/>

    <! -- Configuration 1: Asynchronous thread pool -->
    <task:executor id="executor" pool-size="10" keep-alive="3000" queue-capacity="200" rejection-policy="CALLER_RUNS"/>
    <! -- Configuration 2: Asynchronous thread pool -->
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="5"/>
        <property name="keepAliveSeconds" value="3000"/>
        <property name="maxPoolSize" value="50"/>
        <property name="queueCapacity" value="200"/>
    </bean>
</beans>
Copy the code

Async is enabled, which is equivalent to the @async annotation. The Executor property is used to specify an asynchronous thread pool. The above example configures two asynchronous thread pools, either of which can be specified. A consistent configuration style is clearer with < Task :executor />, which is thread pooling’s special schema support for asynchronous event mechanisms with limited parameters. Use the standard Bean org. Springframework. Scheduling. Concurrent. ThreadPoolTaskExecutor configured, can support the most comprehensive configuration properties.

With regard to the configuration of asynchronous thread pools, the details can be seen through code analysis, with the conclusion first.

  • When async is enabled, the internal implementation is automatically called without configuration and without specifying any thread poolSimpleAsyncTaskExecutorThe thread pool performs asynchronous execution
  • If configuredorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutorThread pool, which is used by default for asynchronous execution even if not specified
  • If an asynchronous thread pool is specified, asynchronous execution is performed according to the specified asynchronous thread pool and is isolated from other thread pools

An event definition

/** * created by guanjian on 2021/4/6 17:46 */
public class EventObject extends ApplicationEvent {

    public EventObject(Object source) {
        super(source); }}Copy the code

Extend the event object by inheriting the ApplicationEvent class, which is used to pass event object data, and publish the event to a specified Listener.

Event publishing

/** * created by guanjian on 2021/4/6 17:40 */
@Component
public class AsyncPublisher implements ApplicationEventPublisherAware {

    private final static Logger LOGGER = LoggerFactory.getLogger(AsyncPublisher.class);

    @Resource
    private ApplicationEventPublisher publisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }

    public void publish(EventObject eventObject) {
        LOGGER.info(Thread={} Publish event source={},Thread.currentThread().getName(), eventObject.getSource());
        this.publisher.publishEvent(eventObject); }}Copy the code

Method, implementing the ApplicationEventPublisherAware interface will ApplicationEventPublisher injection and through it to invoke publishEvent event publishing () method.

Event listeners

/** * created by guanjian on 2021/4/6 17:42 */
@Component
public class AsyncListener implements ApplicationListener<EventObject> {

    private final static Logger LOGGER = LoggerFactory.getLogger(AsyncListener.class);

    @Async
    @Override
    public void onApplicationEvent(EventObject eventObject) {
        LOGGER.info("Thread={} listen for event source={}", Thread.currentThread().getName(), eventObject.getSource()); }}Copy the code

The @async annotation enables the method to execute asynchronously or synchronously

Note: Both Publisher and listener must be included in Spring’s management to take effect

The failure reason

  • Don’t pass@EnableAsyncAnnotations turn on asynchronous support or are not configured through XML<task:annotation-driven />Enabling Asynchronous Support
  • Listener methods are not added@Asyncannotations
  • Both publishers and listeners are involved in Spring management to check for forgotten information@ComponentAnnotation tag

Practical example

MQ message based asynchronous distribution based on user information change events

Relationship between the topology

The directory structure

│ mqEventPublisher. Java │ mqEventListener. Java │ mqEventPublisher. Java │ mqEventListener. Java │ mqEventPublisher. Java │ mqEventPublisher. Java │ MqEventPublisher ├ ─ handler │ UserInfoInvalidEventHandler. Java / / failure event handler │ UserInfoModifyEventHandler. │ Java / / modify the event handler UserInfoValidEventHandler. Java/effective/event handlers │ ├ ─ holder │ MqEventHolder. Java / / events enumerated relation with event handler container │ └ ─ vo UserInfoInValidEventVo. Java / / failure event object UserInfoModifyEventVo. Java / / modify UserInfoValidEventVo event object. The Java/effective/event object

Advantages & Disadvantages

advantages

  • The business of decoupling
  • Asynchronous processing to reduce synchronous execution blocking

disadvantages

  • There are many knowledge blind spots, so you need to have a deep understanding of the implementation mechanism and principle, and use it in combination with Spring configuration. If you ignore the underlying implementation, it will not work, but will have the opposite effect
  • Because the asynchronous thread is parallel processing, it is not suitable for the business logic before and after the dependency, nor is it suitable for the logical processing of transaction characteristics operation

extension

CompletableFuture

reference

Spring mechanism explanation of the Spring events mechanism, a Spring asynchronous programming | your @ Async is really an asynchronous? Asynchronous adventures and adventures