Recently, I found such a business scenario in Code Review. After the completion of a certain business process, the reviewers need to be notified by SMS and email, so the Code is roughly like this:

// Validate (); // handle business logic doBusiness(); // Send mail or other type of message sendMsg();Copy the code

Is this right?

Based on this common business scenario, the first thing to consider is synchronous or asynchronous sending.

Synchronization affects the interface RT and is coupled to the business logic, which is definitely not a good idea.

In general, we will do it asynchronously, using MQ to send our own consumption, or a thread pool to do so without affecting the main business logic, improving performance, and keeping the code decoupled.

Then there is the problem of data consistency. Must the email be sent successfully?

In fact, most of the time we do not require that the email must be 100% successful delivery, failure is a good failure, monitoring alarm well done failure rate should not exceed the threshold, and is the message service once received the request should ensure that the message can be delivered.

So in general, using MQ to send messages for their own consumption processing, or thread pool asynchronous processing, and eventually their own compensation logic can handle this kind of problem.

So, today we’re going to talk about something other than these two solutions, which we can actually do with EventBus.

EventBus use

As the name suggests, EventBus stands for EventBus. It is a tool in the Google Guava library that decoupled code in the process based on the observer mode.

In the example above, the introduction of MQ is too heavy to be necessary, and EventBus can do the same. Compared to MQ, it can only provide in-process message and event delivery, which is sufficient for our business scenario, right?

Let’s take a look at how to use EventBus first, usually by creating an EventBus instance.

Private static EventBus EventBus = new EventBus();Copy the code

The second step is to create an event message subscriber, which is as simple as adding @SUBSCRIBE to the method we want to handle the event.

A parameter can have only one parameter, and zero or more parameters will result in an error.

public class EmailMsgHandler { @Subscribe public void handle(Long businessId) { System.out.println("send email msg" + businessId); }}Copy the code

Step 3, register the event.

eventBus.register(new EmailMsgHandler());
Copy the code

Step 4, send the event.

eventBus.post(1L);
Copy the code

This is a very simple example of how to use EventBus, so let’s take a look at it in combination with the example above.

Combined with the actual

For example, the above cases, such as the registration and user order scenarios, require sending messages and emails to the user.

EventBus doesn’t force us to use singletons because they are cheap to create and destroy, so it’s more about our business scenario and context.

public class UserService { private static EventBus eventBus = new EventBus(); public void regist(){ Long userId = 1L; eventBus.register(new EmailMsgHandler()); eventBus.register(new SmsMsgHandler()); eventBus.post(userId); } } public class BookingService { private static EventBus eventBus = new EventBus(); Public void booking(){// Long bookingId = 2L; eventBus.register(new EmailMsgHandler()); eventBus.register(new SmsMsgHandler()); eventBus.post(bookingId); }}Copy the code

Then after the business logic processing is completed, the two event subscribers of mail and SMS are registered respectively.

public class EmailMsgHandler { @Subscribe public void handle(Long businessId) { System.out.println("send email msg" + businessId); } } public class SmsMsgHandler { @Subscribe public void handle(Long businessId) { System.out.println("send sms msg" + businessId); }}Copy the code

And then finally we send the event, the user registers and we send a user ID, the order is successful and we send an order ID.

Write another test class to test it by creating two services and calling methods separately.

public class EventBusTest { public static void main(String[] args) { UserService userService = new UserService(); userService.regist(); BookingService bookingService = new BookingService(); bookingService.booking(); }}Copy the code

By executing the test class, we can see the output that executes our event subscribed methods separately.

send email msg1
send sms msg1
send email msg2
send sms msg2
Copy the code

You’ll find it very simple to use, and EventBus is perfect for those who want to decouple in a lightweight and simple way.

Be careful not to step in the hole

First, notice that the arguments in this example are of type Long. If the event argument is of any other type, the message would not be received. For example, if we change the order ID sent in the order order to String, we would find that there is no consumption because we did not define a method that takes type String.

public class BookingService { private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3)); Public void booking(){// Service logic String bookingId = "2"; eventBus.register(new EmailMsgHandler()); eventBus.register(new SmsMsgHandler()); eventBus.post(bookingId); }} // Send email msg1 send SMS msg1Copy the code

Both EmailMsgHandler and SmsMsgHandler will add a subscription method to receive strings so that they can be received.

@Subscribe public void handle(String businessId) { System.out.println("send email msg for string" + businessId); } @Subscribe public void handle(String businessId) { System.out.println("send sms msg for string" + businessId); } send SMS msg1 send email msg1 send email MSG for string2 send SMS MSG for string2Copy the code

In addition, we can actually define a DeadEvent to handle this case, which is the default way to send a DeadEvent event if no event type parameter matches.

Define a default handler.

public class DefaultEventHandler { @Subscribe public void handle(DeadEvent event) { System.out.println("no subscriber," + event); }}Copy the code

Add a pay() payment method to BookingService, place an order and pay, register our default event.

Public void pay(){// EventBus. register(new DefaultEventHandler()); eventBus.post(new Payment(UUID.randomUUID().toString())); } @ToString @Data @NoArgsConstructor @AllArgsConstructor public class Payment { private String paymentId; }Copy the code

Run the test bookingservice.pay () and see the output:

no subscriber,DeadEvent{source=AsyncEventBus{default}, event=Payment(paymentId=255da942-7128-4bd1-baca-f0a8e569ed88)}
Copy the code

Source code analysis

OK, that’s enough for the brief introduction, but actually all we’ve talked about so far have been synchronous calls, which is not quite what we want, and we’re certainly better off using asynchronous processing.

Then look at the source code it is how to achieve.

@Beta public class EventBus { private static final Logger logger = Logger.getLogger(EventBus.class.getName()); private final String identifier; private final Executor executor; private final SubscriberExceptionHandler exceptionHandler; private final SubscriberRegistry subscribers = new SubscriberRegistry(this); private final Dispatcher dispatcher; public EventBus() { this("default"); } public EventBus(String identifier) { this( identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE); }}Copy the code

Identifier is a name, a tag, and the default is default.

Executor actuators, default to create a MoreExecutors directExecutor (), event subscribers according to your own executor to decide how to perform event subscriptions are handled.

ExceptionHandler is an exceptionHandler that is created by default.

Subscribers are our consumers, subscribers.

Dispatcher is used for event distribution.

The default create executor is a MoreExecutors directExecutor (), see the command. The run () you’ll find he is this synchronous execution.

public static Executor directExecutor() {
 return DirectExecutor.INSTANCE;
}

private enum DirectExecutor implements Executor {
 INSTANCE;

@Override
public void execute(Runnable command) {
 command.run();
}

@Override
public String toString() {
 return "MoreExecutors.directExecutor()";
}
Copy the code

Synchronous execution is still not so good, we want to decouple not only, but also asynchronous execution, so EventBus gives us AsyncEventBus, executors we’ll just pass in.

public class AsyncEventBus extends EventBus {

  public AsyncEventBus(String identifier, Executor executor) {
    super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }
 
  public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
    super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
  }

  public AsyncEventBus(Executor executor) {
    super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }
Copy the code

Let’s make the above code asynchronous, so that we can actually use our own thread pool to handle it.

private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3));
Copy the code

DeadEvent: If the subscriber has no current event, it will send a DeadEvent event.

public void post(Object event) {
  Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
   if (eventSubscribers.hasNext()) {
   dispatcher.dispatch(event, eventSubscribers);
  } else if (!(event instanceof DeadEvent)) {
    // the event had no subscribers and was not itself a DeadEvent
    post(new DeadEvent(this, event));
  }
}
Copy the code

conclusion

OK, this use and source code or relatively simple ha, interested students can go to Chou, do not spend much time.

Overall, EventBus provides us with a more elegant way to decouple code, and you can definitely use it in the real world!