Synchronous & asynchronous
- Synchronization event
In a thread, all business methods are executed sequentially, with a dependency between the top and the bottom, where one link takes too long or blocks, affecting subsequent links and the overall increase in time.
- Asynchronous events
In one thread, one business method or logic is executed, and other business methods or logic are executed in parallel through asynchronous threads, independent of each other, which can make full use of the advantages of multi-threading to improve concurrency and reduce overall time consumption.
Realize the principle of
The interaction process
- Publisher Event publisher, here is the publishing entry for the event object
- Listener Event listener, which is the final object handled by the event object
- Event The event object, the carrier of event data
- Multicast applicationEventMultiCaster events, it is a bridge connecting the publishing and listener and transit route, responsible for distribute the event object to a specific listener
The event mechanism in Spring is implemented through the observer mode, which supports both synchronous and asynchronous modes.
The event mechanism of Spring provides an elegant programming method that can achieve business decoupling. The implementation is stripped off, making the implementation details more specific and focused, which is convenient for subsequent maintenance to a certain extent and has good scalability.
In general, we do this asynchronously, otherwise it’s not much different from everyday synchronous calls and doesn’t take full advantage of the parallelism that asynchronous threads bring.
The characteristics of | role |
---|---|
Observer model | The decoupling |
asynchronous | parallelization |
Initialize the
As for the initialization of Spring event mechanism, in fact, it is mainly based on the observer mode to bind and establish the routing relationship between publisher and listener. Event multicast also relies on the mapping relationship to distribute event objects and process specific listeners.
The Spring event mechanism is only a part of the whole Spring environment. It is not difficult to understand the initialization mode and working principle of the Spring environment before analyzing the initialization of the event mechanism. We know AbstractApplicationContext is environment of the abstract base class in Spring, it’s the refresh () method covers all the Spring initialization process, It contains initialization method of multicast initApplicationEventMulticaster () and listener binding method registerListeners ().
public void refresh(a) throws BeansException, IllegalStateException {
/ /...
// omit others
synchronized(this.startupShutdownMonitor) {
/ /...
// omit others
try {
/ /...
// omit others
this.initApplicationEventMulticaster();
this.registerListeners();
// omit others
/ /...
} catch (BeansException var9) {
/ /...
// omit others
} finally {
/ /...
// omit others}}}Copy the code
See below initialization method of multicast initApplicationEventMulticaster (), mainly from the BeanFactory to judge whether the created multicast, If does not create to create the default SimpleApplicationEventMulticaster as multicast.
protected void initApplicationEventMulticaster(a) {
// Get the current BeanFactory to get the multicast Bean instance from the Bean factory
ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
// If there is a multicast, get it directly
if (beanFactory.containsLocalBean("applicationEventMulticaster")) {
this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); }}/ / if there is no multicast, the Bean directly to create the default SimpleApplicationEventMulticaster multicast (CCM) registration
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton("applicationEventMulticaster".this.applicationEventMulticaster);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Unable to locate ApplicationEventMulticaster with name 'applicationEventMulticaster': using default [" + this.applicationEventMulticaster + "]"); }}}Copy the code
Methods of registering listeners
protected void registerListeners(a) {
// Get iterators for all applicationListeners
Iterator var1 = this.getApplicationListeners().iterator();
// Register all listeners with the current multicast
while(var1.hasNext()) { ApplicationListener<? > listener = (ApplicationListener)var1.next();this.getApplicationEventMulticaster().addApplicationListener(listener);
}
String[] listenerBeanNames = this.getBeanNamesForType(ApplicationListener.class, true.false);
String[] var7 = listenerBeanNames;
int var3 = listenerBeanNames.length;
for(int var4 = 0; var4 < var3; ++var4) {
String listenerBeanName = var7[var4];
this.getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// If it is an early event, execute the publish event object directly
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if(earlyEventsToProcess ! =null) {
Iterator var9 = earlyEventsToProcess.iterator();
while(var9.hasNext()) {
ApplicationEvent earlyEvent = (ApplicationEvent)var9.next();
this.getApplicationEventMulticaster().multicastEvent(earlyEvent); }}}Copy the code
Publish event
Publishing events through AbstractApplicationContext publishEvent released () method
public void publishEvent(Object event) {
publishEvent(event, null);
}
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// Check whether the event type is ApplicationEvent. If not, encapsulate it as PayloadApplicationEvent
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
// If it is an early event of Spring initialization, it needs to be added to the early event and published immediately
if (this.earlyApplicationEvents ! =null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
// If it is not an early event, the event is published immediately via multicast
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish the event if the parent context exists
if (this.parent ! =null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event); }}}Copy the code
Then look at SimpleApplicationEventMulticaster multicastEvent () method, this method is multicast broadcast events
public void multicastEvent(ApplicationEvent event) {
// Parse the Event type to publish the Event
multicastEvent(event, resolveDefaultEventType(event));
}
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
// Get the parse type of the eventResolvableType type = (eventType ! =null ? eventType : resolveDefaultEventType(event));
// Get the current thread pool of the multicast
Executor executor = getTaskExecutor();
// Gets a collection of ApplicationListeners currently in the application that match the given event type. Listeners that do not match are excluded. Recycle execution
for(ApplicationListener<? > listener : getApplicationListeners(event, type)) {// If the thread pool is not empty, it is executed asynchronously through the thread pool
if(executor ! =null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
// Otherwise, the current thread executesinvokeListener(listener, event); }}}Copy the code
The event processing
Event processing by invokeListener () method for processing, the method to do the try and catch a layer encapsulation, practical methods in doInvokeListener () method of the listener. OnApplicationEvent (event)
protected void invokeListener(ApplicationListener
listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if(errorHandler ! =null) {
try {
doInvokeListener(listener, event);
}
catch(Throwable err) { errorHandler.handleError(err); }}else{ doInvokeListener(listener, event); }}@SuppressWarnings({"rawtypes", "unchecked"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
// The actual listener accepts the event and processes it
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {
logger.trace("Non-matching event type for listener: "+ listener, ex); }}else {
throwex; }}}Copy the code
Relationship between matching
After an event object is published, the listeners are retrieved by the getApplicationListeners method. The returned listeners are traversed and type resolved, and qualified listeners are found.
protectedCollection<ApplicationListener<? >> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) {// The object that originally sent the eventObject source = event.getSource(); Class<? > sourceType = (source ! =null ? source.getClass() : null);
// Encapsulates the given event type with the source type
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// Quick check for existing entry on ConcurrentHashMap...
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if(retriever ! =null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// Fully synchronize build and cache the ListenerRetriever
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if(retriever ! =null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
// Actually retrieves the application listeners of the given event and source type, returning the filtered collection of listenersCollection<ApplicationListener<? >> listeners = retrieveApplicationListeners(eventType, sourceType, retriever);this.retrieverCache.put(cacheKey, retriever);
returnlisteners; }}else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null); }}Copy the code
Through retrieveApplicationListeners () method to actually retrieve a given event listener and source type application implementation. The supportsEvent() method is used to determine whether the listener supports a given event.
privateCollection<ApplicationListener<? >> retrieveApplicationListeners( ResolvableType eventType,@NullableClass<? > sourceType,@NullableListenerRetriever retriever) { List<ApplicationListener<? >> allListeners =new ArrayList<>();
// A collection of listeners in the current applicationSet<ApplicationListener<? >> listeners;// The BeanName collection of listeners in the current application
Set<String> listenerBeans;
synchronized (this.retrievalMutex) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}
for(ApplicationListener<? > listener : listeners) {// Determine whether the listener supports the given event
if (supportsEvent(listener, eventType, sourceType)) {
if(retriever ! =null) { retriever.applicationListeners.add(listener); } allListeners.add(listener); }}if(! listenerBeans.isEmpty()) { BeanFactory beanFactory = getBeanFactory();for (String listenerBeanName : listenerBeans) {
try {
// Get the type based on the listener's BeanNameClass<? > listenerType = beanFactory.getType(listenerBeanName);if (listenerType == null|| supportsEvent(listenerType, eventType)) { ApplicationListener<? > listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);// Determine whether the listener supports the given event
if(! allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {if(retriever ! =null) {
if (beanFactory.isSingleton(listenerBeanName)) {
retriever.applicationListeners.add(listener);
}
else{ retriever.applicationListenerBeans.add(listenerBeanName); } } allListeners.add(listener); }}}catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
AnnotationAwareOrderComparator.sort(allListeners);
if(retriever ! =null && retriever.applicationListenerBeans.isEmpty()) {
retriever.applicationListeners.clear();
retriever.applicationListeners.addAll(allListeners);
}
return allListeners;
}
Copy the code
Configuration mode
Open the asynchronous
- Annotation way
/** * created by guanjian on 2021/4/7 9:36 */
@Configuration
@EnableAsync
public class AsyncConfig{
// More options can be configured here, such as asynchronous thread pools, etc
}
Copy the code
- XML way
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/aop https://www.springframework.org/schema/aop/spring-aop.xsd">
<! Async thread pool Async thread pool Async thread pool Async thread pool Async thread pool Async
<task:annotation-driven />
</beans>
Copy the code
< Task :annotation-driven “/> enables asynchrony, which is equivalent to the @async annotation.
Multicast definition
<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
<property name="taskExecutor" ref="executor" />
</bean>
Copy the code
If there is no definition can also, you will be in the initialization phase create SimpleApplicationEventMulticaster to the multicast device as the default.
Asynchronous thread definition
- Annotation way
/** * created by guanjian on 2021/4/7 9:36 */
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
/** * the asynchronous thread pool is manually injected */
@Override
public Executor getAsyncExecutor(a) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Number of core thread pools
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
// Maximum number of threads
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 5);
// Queue capacity of the thread pool
executor.setQueueCapacity(Runtime.getRuntime().availableProcessors() * 2);
// Prefix of thread name
executor.setThreadNamePrefix("async-executor-");
executor.initialize();
return executor;
}
/** * where asynchronous unknown error capture processing is injected */
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(a) {
return newSimpleAsyncUncaughtExceptionHandler(); }}Copy the code
Configuration, relative to the implementation of AsyncConfigurer interface to complete an asynchronous call getAsyncExecutor is to configure the asynchronous thread pool, getAsyncUncaughtExceptionHandler is to configure the asynchronous unknown error trapping process.
- XML way
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/aop https://www.springframework.org/schema/aop/spring-aop.xsd">
<! Enable @aspectJ AOP proxy
<aop:aspectj-autoproxy proxy-target-class="true" expose-proxy="true"/>
<! Async thread pool Async thread pool Async thread pool Async thread pool Async thread pool Async
<task:annotation-driven executor="executor" proxy-target-class="true"/>
<! -- Configuration 1: Asynchronous thread pool -->
<task:executor id="executor" pool-size="10" keep-alive="3000" queue-capacity="200" rejection-policy="CALLER_RUNS"/>
<! -- Configuration 2: Asynchronous thread pool -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5"/>
<property name="keepAliveSeconds" value="3000"/>
<property name="maxPoolSize" value="50"/>
<property name="queueCapacity" value="200"/>
</bean>
</beans>
Copy the code
With regard to the configuration of asynchronous thread pools, the details can be seen through code analysis, with the conclusion first.
- When async is enabled, the internal implementation is automatically called without configuration and without specifying any thread pool
SimpleAsyncTaskExecutor
The thread pool performs asynchronous execution- If configured
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
Thread pool, which is used by default for asynchronous execution even if not specified- If an asynchronous thread pool is specified, asynchronous execution is performed according to the specified asynchronous thread pool and is isolated from other thread pools
An event definition
/** * created by guanjian on 2021/4/6 17:46 */
public class EventObject extends ApplicationEvent {
public EventObject(Object source) {
super(source); }}Copy the code
Extend the event object by inheriting the ApplicationEvent class, which is used to pass event object data, and publish the event to a specified Listener.
Event publishing
/** * created by guanjian on 2021/4/6 17:40 */
@Component
public class AsyncPublisher implements ApplicationEventPublisherAware {
private final static Logger LOGGER = LoggerFactory.getLogger(AsyncPublisher.class);
@Resource
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void publish(EventObject eventObject) {
LOGGER.info(Thread={} Publish event source={},Thread.currentThread().getName(), eventObject.getSource());
this.publisher.publishEvent(eventObject); }}Copy the code
Method, implementing the ApplicationEventPublisherAware interface will ApplicationEventPublisher injection and through it to invoke publishEvent event publishing () method.
Event listeners
/** * created by guanjian on 2021/4/6 17:42 */
@Component
public class AsyncListener implements ApplicationListener<EventObject> {
private final static Logger LOGGER = LoggerFactory.getLogger(AsyncListener.class);
@Async
@Override
public void onApplicationEvent(EventObject eventObject) {
LOGGER.info("Thread={} listen for event source={}", Thread.currentThread().getName(), eventObject.getSource()); }}Copy the code
The @async annotation enables the method to execute asynchronously or synchronously
Note: Both Publisher and listener must be included in Spring’s management to take effect
The failure reason
- Don’t pass
@EnableAsync
Annotations turn on asynchronous support or are not configured through XML<task:annotation-driven />
Enabling Asynchronous Support - Listener methods are not added
@Async
annotations - Both publishers and listeners are involved in Spring management to check for forgotten information
@Component
Annotation tag
Practical example
MQ message based asynchronous distribution based on user information change events
Relationship between the topology
The directory structure
│ mqEventPublisher. Java │ mqEventListener. Java │ mqEventPublisher. Java │ mqEventListener. Java │ mqEventPublisher. Java │ mqEventPublisher. Java │ MqEventPublisher ├ ─ handler │ UserInfoInvalidEventHandler. Java / / failure event handler │ UserInfoModifyEventHandler. │ Java / / modify the event handler UserInfoValidEventHandler. Java/effective/event handlers │ ├ ─ holder │ MqEventHolder. Java / / events enumerated relation with event handler container │ └ ─ vo UserInfoInValidEventVo. Java / / failure event object UserInfoModifyEventVo. Java / / modify UserInfoValidEventVo event object. The Java/effective/event object
Advantages & Disadvantages
advantages
- The business of decoupling
- Asynchronous processing to reduce synchronous execution blocking
disadvantages
- There are many knowledge blind spots, so you need to have a deep understanding of the implementation mechanism and principle, and use it in combination with Spring configuration. If you ignore the underlying implementation, it will not work, but will have the opposite effect
- Because the asynchronous thread is parallel processing, it is not suitable for the business logic before and after the dependency, nor is it suitable for the logical processing of transaction characteristics operation
extension
CompletableFuture
reference
Spring mechanism explanation of the Spring events mechanism, a Spring asynchronous programming | your @ Async is really an asynchronous? Asynchronous adventures and adventures