background
In the process of development, a stand-alone program needs to decouple some operations and execute them asynchronously in turn. Moreover, there is no need for persistent operations. After the program is restarted, the operation is based on the latest data again. During development, several options were considered:
- MQ.
- Spring events or EventBus.
Since the program is stand-alone and not complicated, accessing MQ is too large and increases the complexity of the system, and it does not require message persistence, so it is passed away. Both Spring events and EventBus are based on the Observer pattern, which is easy to develop and does not increase the system complexity, but cannot meet the requirements of asynchronous sequential execution of events. After looking at the code for EventBus, I decided to change it based on this.
Rely on
The EventBus functionality provided by Guava is used here, so you need to use the Guava package
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
EventBus execution ideas
EventBus
的事件方法方法:EventBus.post
Public void post (Object event) {/ / get all the listeners Iterator < the Subscriber > eventSubscribers = subscribers. GetSubscribers (event); If (EventSubscribers. HasNext ()) {// The method dispatcher.dispatch(event, EventSubscribers) executes all the listeners; } else if (! (event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); }}
The dispatcher must specify when declaring the class. By default, the LegacyAsyncDispatcher class is used in AsyncEventBus, which is the asynchronous executor provided by EventBus. The dispatcher method is as follows:
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
The main logic is to get all the listeners and call the Subscriber.DispathEvent method in turn.
The subscriber. DispathEvent method:
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); }}}); }
As you can see here, it’s all executed by executor.execute.
Executor refers to a pool of threads, that is, the thread pool that is responsible for the final execution of the asynchronous EventBus.
So, if you want to design in an asynchronous order, you can specify a thread pool maxSize of 1 to ensure sequential execution.
So, when you declare AsyncEventBus, you need to specify a custom Executor
@Target(ElementType.type) @Retention(RetentionPolicy.Runtime) public @Interface EventConsume {/** * EventConsume */ String identifier(); } public interface EventHandler<T> { @Subscribe boolean process(T data); } @Slf4j public class BlockingPolicy implements RejectedExecutionHandler { @Override public void RejectedExecution (Runnable R, ThreadPoolExecutor) {log.error(" The pool [{}] queue is full, Is performing a blocking wait ", executor.toString()); if (! executor.isShutdown()) { try { executor.getQueue().put(r); } catch (Exception e) {log.error(" blocking policy Exception ", e); } } } } public class EventThreadPoolFactory { private static final int DEFAULT_CORE_SIZE = 0; private static final int DEFAULT_MAX_SIZE = 1; private static final int DEFAULT_TIMEOUT = 1; private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.HOURS; private static final int DEFAULT_QUEUE_SIZE = 5000; private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE); public static Executor buildDefaultExecutor(String identifier) { return new ThreadPoolExecutor(DEFAULT_CORE_SIZE, DEFAULT_MAX_SIZE, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, DEFAULT_WORK_QUEUE, ThreadFactoryBuilder.create().setNamePrefix(String.format("%s-", identifier)).build(), new BlockingPolicy()); } } @Component @Slf4j public class EventHub { private final Map<String, AsyncEventBus> eventBusMap = new ConcurrentHashMap<>(); @Autowired private ApplicationContext applicationContext; @PostConstruct public void onStart() { this.loadEventHandler(); } public void loadEventHandler() { Map<String, EventHandler> eventHandlerMap = applicationContext.getBeansOfType(EventHandler.class); for (EventHandler eventHandler : eventHandlerMap.values()) { this.register(eventHandler); } } private void register(EventHandler eventHandler) { EventConsume eventConsume = eventHandler.getClass().getAnnotation(EventConsume.class); If (eventConsume = = null | | StrUtil. IsBlank (eventConsume. The identifier ())) {log. The error (" EventHandler [{}] no configuration identifier, ", EventHandler.getClass().getSimpleName()); return; } String identifier = eventConsume.identifier(); AsyncEventBus eventBus = eventBusMap.get(identifier); if (eventBus == null) { AsyncEventBus asyncEventBus = new AsyncEventBus(identifier, EventThreadPoolFactory.buildDefaultExecutor(identifier)); eventBus = ObjectUtil.defaultIfNull(eventBusMap.putIfAbsent(identifier, asyncEventBus), asyncEventBus); } eventBus.register(eventHandler); } public EventBus getEventBus(String identifier) { return eventBusMap.get(identifier); } } @Component @Slf4j public class EventProducer { @Autowired private EventHub eventHub; public <T> void post(String identifier, T data) { EventBus eventBus = eventHub.getEventBus(identifier); If (eventBus == null) {log.error(" Identifier [{}] No Event Listener ", identifier); return; } eventBus.post(data); }}