Series of articles
- Go | Go language pack static files and how to use the Go – bindata with Gin
- Go | Gin solve the problem of cross-domain cross-domain configuration
[TOC]
preface
EventBus is Guava’s event handling mechanism and an implementation of the Observer pattern (production/consumption model).
The observer mode is widely used in our daily development. For example, in the order system, changes in order status or logistics information will send APP push, SMS to users, and notify sellers and buyers, etc. In the examination and approval system, the flow of the examination and approval order will be notified to the users who initiated the examination and approval and the leaders of the examination and approval, etc.
The Observer mode is also built-in in the JDK. It has been in the Observer since version 1.0, but the way it is used has not changed as Java versions have evolved rapidly, with many libraries providing simpler implementations. For example, Guava EventBus, RxJava, and EventBus
Why use the Observer mode and the benefits of EventBus?
EventBus advantages
- Compared to Observer programming is simple and convenient
- By customizing parameters, synchronous and asynchronous operations and exception handling can be realized
- It is used by a single process and has no network impact
disadvantages
- This parameter can only be used by a single process
- An abnormal restart or exit of the project does not guarantee message persistence
If distributed use is required, MQ will be used
2. How to use EventBus
1. The import library
Gradle
compile group: 'com.google.guava'.name: 'guava'.version: '29.0-jre'
Copy the code
Maven
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0 the jre</version>
</dependency>
Copy the code
After introducing depend on, here we mainly use com.google.com mon. The eventbus. The eventbus class, it provides the register and unregister, post registered to subscribe, unsubscribe, and release information
public void register(Object object);
public void unregister(Object object);
public void post(Object event);
Copy the code
2. Use it synchronously
1. Create an EventBus first
EventBus eventBus = new EventBus();
Copy the code
2. Create a subscriber
In Guava EventBus, the subscription is based on the parameter type, and each subscription method can have only one parameter, with the @Subscribe identifier
class EventListener {
/** * listen for messages of type Integer */
@Subscribe
public void listenInteger(Integer param) {
System.out.println("EventListener#listenInteger ->" + param);
}
/** * listen for String messages */
@Subscribe
public void listenString(String param) {
System.out.println("EventListener#listenString ->"+ param); }}Copy the code
3. Register with EventBus and post messages
EventBus eventBus = new EventBus();
eventBus.register(new EventListener());
eventBus.post(1);
eventBus.post(2);
eventBus.post("3");
Copy the code
The run result is
EventListener#listenInteger ->1
EventListener#listenInteger ->2
EventListener#listenString ->3
Copy the code
We can create as many subscribers as needed to complete the subscription information, and if there are more than one subscriber for a type, all subscription methods are executed
Why is this synchronous?
Guava Event is actually using a thread pool to process the subscribe message, through the source code, you can see that when we use the default constructor to create EventBus, the executor to MoreExecutors directExecutor (), Its implementation calls the Runnable#run method directly, so that it is still executed in the same thread, so that the default operation is still synchronized. This approach also has some advantages, such as decoupling and allowing methods to execute in the same thread for the convenience of the same thread, such as transaction processing
EventBus part of the source
public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers;
private final Dispatcher dispatcher;
public EventBus(a) {
this("default");
}
public EventBus(String identifier) {
this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
}
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
this.subscribers = new SubscriberRegistry(this);
this.identifier = (String)Preconditions.checkNotNull(identifier);
this.executor = (Executor)Preconditions.checkNotNull(executor);
this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler); }}Copy the code
DirectExecutor part of the source
enum DirectExecutor implements Executor {
INSTANCE;
private DirectExecutor(a) {}public void execute(Runnable command) {
command.run();
}
public String toString(a) {
return "MoreExecutors.directExecutor()"; }}Copy the code
3. Use it asynchronously
Guava EventBus provides a simplified solution called AsyncEventBus to simplify operations
EventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
Copy the code
This enables asynchronous use
AsyncEventBus source
public class AsyncEventBus extends EventBus {
public AsyncEventBus(String identifier, Executor executor) {
super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
}
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); }}Copy the code
4. Exception handling
What should I do if an exception occurs during processing? In the source, whether EventBus or can be introduced into the custom AsyncEventBus SubscriberExceptionHandler the handler is called when abnormal, I can be available from the parameters of the exception exception information, Retrieves message information from the context for specific processing
Its interface is declared as
public interface SubscriberExceptionHandler {
/** Handles exceptions thrown by subscribers. */
void handleException(Throwable exception, SubscriberExceptionContext context);
}
Copy the code
conclusion
On the basis of the above, we can define some message type to implement different news monitoring and processing, by implementing SubscriberExceptionHandler to deal with the abnormal situation, no matter when the synchronous or asynchronous can
reference
- Github.com/google/guav…
- Github.com/greenrobot/…
- Github.com/ReactiveX/R…