This is the fifth day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021

Use a background

Some service provider identifies the message sent by users through NLP to determine whether it has special meaning, such as an inquiry information, labels it and pushes it to our side, and then does some business processing. Of course, there are many types of messages, not only the inquiry type. Consider using the observer mode to handle many different push messages. In Guava, EventBus fits the bill and is easy to use.

Method of use

The first is the event processing center

Very simple, just an EventBus instance

public class EventBusCenter {
    public static final EventBus eventBus = new EventBus();
}
Copy the code
Define different event types
@Data
@Builder
public class AEvent {
    private String name;
    private Integer id;
}
Copy the code
@Data
@Builder
public class BEvent {
    private String name;
    private Integer id;
}
Copy the code
@Data
@Builder
public class CEvent {
    private String name;
    private Integer id;
}
Copy the code
Define listeners for each event type

Note that each listener’s event handling method must be annotated **@Subscribe, public void, and has only one argument **.

public class AEventListener { @Subscribe public void handlerAEvent(AEvent aEvent) { System.out.println("handle A Event"); }}Copy the code
public class BEventListener { @Subscribe public void handlerAEvent(BEvent bEvent) { System.out.println("handle B Event"); }}Copy the code
public class CEventListener { @Subscribe public void handlerCEvent(CEvent cEvent) { System.out.println("handle C Event"); }}Copy the code
test

The register method registers listeners, and the POST method pushes specific events to eventBus.

public class MainTest {
    public static void main(String[] args) {
        EventBusCenter.eventBus.register(new AEventListener());
        EventBusCenter.eventBus.register(new BEventListener());
        EventBusCenter.eventBus.register(new CEventListener());
        EventBusCenter.eventBus.post(AEvent.builder().id(1).name("A").build());
        EventBusCenter.eventBus.post(BEvent.builder().id(2).name("B").build());
        EventBusCenter.eventBus.post(CEvent.builder().id(3).name("C").build());
    }
}
Copy the code

The way to use it is simple. But it doesn’t make sense to apply it to specific scenarios. For example, an exception occurs when handling event A

@Subscribe
public void handlerAEvent(AEvent aEvent) {
    System.out.println("handle A Event");
    throw new RuntimeException();
}
Copy the code

Exceptions are thrown up and eventually caught by Subcriber. But when applied to a specific scenario, I don’t have to throw an exception, I have a bottom of the barrel. You can use the eventbus events in exception handling SubscriberExceptionHandler. You only need to add parameters when creating the EventBus.

public class EventBusCenter {
    public static final EventBus eventBus = new EventBus(((exception, context) -> {
        System.out.println("error do something");
        System.out.println(context.getEvent());
        System.out.println(context.getEventBus());
        System.out.println(context.getSubscriber());
    }));
}
Copy the code

Another problem is how to inform the front end to handle the event. My handler specifically writes a listener that subscribes to events to complete notifications, because the existing architecture communicates with the front end through webSockets and does not need to notify the front end through return values.

public class MainTest { public static void main(String[] args) { EventBusCenter.eventBus.register(new AEventListener(EventBusCenter.eventBus)); EventBusCenter.eventBus.register(new NoticeListener()); EventBusCenter.eventBus.register(new BEventListener()); EventBusCenter.eventBus.register(new CEventListener()); EventBusCenter.eventBus.post(AEvent.builder().id(1).name("A").build()); EventBusCenter.eventBus.post(BEvent.builder().id(2).name("B").build()); EventBusCenter.eventBus.post(CEvent.builder().id(3).name("C").build()); }}Copy the code
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Notice {
    String content;
}
Copy the code
public class NoticeListener { @Subscribe public void handleNotice(Notice notice) { System.out.println(notice.getContent()); // Notify the front end via webcocket}}Copy the code
public class AEventListener { private EventBus eventBus; public AEventListener(EventBus eventBus) { this.eventBus = eventBus; } @Subscribe public void handlerAEvent(AEvent aEvent) { System.out.println("handle A Event"); EventBus. Post (new Notice("handle an Event success")); }}Copy the code

Refer to a simple EventBus implemented by a video

Project directory

MyBus
public interface MyBus {
    
    void register(Object subscriber);

    void post(Object event,String topic);

    void post(Object event);

    String getBusName();
}
Copy the code
MyEventBus
public class MyEventBus implements MyBus { private String busName; private MyEventExceptionHandler handler; private MyDispatcher dispatcher; private Executor executor; public MyEventBus(String busName) { this.busName = busName; } public MyEventBus(String busName, MyEventExceptionHandler handler) { this(busName); this.handler = handler; } public MyEventBus() { this(Constant.DEFAULT_BUS_NAME, null); } private final MyRegister register = new MyRegister(); @Override public void register(Object subscriber) { this.register.bind(subscriber); } @Override public void post(Object event, String topic) { this.dispatcher.dispatcher(ExecutorParam.builder() .event(event) .bus(this) .topic(topic) .handler(handler) .build()); } @Override public void post(Object event) { this.post(event, Constant.DEFAULT_TOPIC_NAME); } @Override public String getBusName() { return this.busName; }}Copy the code
MyRegister
Class MyRegister {private final ConcurrentHashMap<String, ConcurrentLinkedQueue<MySubscriber>> registerTable = new ConcurrentHashMap<>(); public ConcurrentLinkedQueue<MySubscriber> getMySubscriberQueue(String topic) { return registerTable.get(topic); } public void bind(Object subscribe) { List<Method> subscriberMethod = this.getSubscriberMethod(subscribe); subscriberMethod.forEach(v -> this.helper(v, subscribe)); } public void unbind(Object subscribe) { registerTable.forEach((k, q) -> { q.forEach(v -> { if (v.getSubcriber() == subscribe) { v.setDisable(true); }}); }); } private void helper(Method method, Object subscribe) { MySubscribe mySubscribe = method.getDeclaredAnnotation(MySubscribe.class); String topic = mySubscribe.topic(); registerTable.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>()); ConcurrentLinkedQueue<MySubscriber> mySubscribers = registerTable.get(topic); mySubscribers.add(new MySubscriber(subscribe, method, false)); } private List<Method> getSubscriberMethod(Object subscribe) { Class<? > aClass = subscribe.getClass(); Method[] methods = aClass.getDeclaredMethods(); List<Method> res = Arrays.stream(methods).filter(v -> { return v.isAnnotationPresent(MySubscribe.class) && v.getParameterCount() == 1 && v.getModifiers() == Modifier.PUBLIC; }).collect(Collectors.toList()); return res; }}Copy the code
MyDispatcher
public class MyDispatcher { private final MyRegister register = new MyRegister(); public void dispatcher(ExecutorParam param) { ConcurrentLinkedQueue<MySubscriber> mySubscriberQueue = register.getMySubscriberQueue(param.getTopic()); if (Objects.isNull(mySubscriberQueue)) { if (Objects.isNull(param.getHandler())) { param.getHandler().handle(new IllegalArgumentException (" topic illegal "), MyEventBusContext.builder() .event(param.getEvent()) .myEventBus(param.getBus()) .eventBusName(param.getBus().getBusName()) .build()); return; } else { throw new IllegalArgumentException(); } } mySubscriberQueue.stream().filter(v -> { return ! v.getDisable(); }).filter(s -> { Method method = s.getSubcriberMethods(); Class<? > classz = method.getParameterTypes()[0]; return classz.isAssignableFrom(param.getEvent().getClass()); }).forEach(v -> { this.executorMethod(v, param); }); } private void executorMethod(MySubscriber mySubscriber, ExecutorParam param) { Object subscriber = mySubscriber.getSubcriber(); Method method = mySubscriber.getSubcriberMethods(); param.getExecutor().execute(() -> { try { method.invoke(subscriber, param.getEvent()); } catch (Exception e) { if (Objects.nonNull(param.getHandler())) { param.getHandler().handle(new Exception("failed"), MyEventBusContext.builder() .event(param.getEvent()) .myEventBus(param.getBus()) .eventBusName(param.getBus().getBusName()) .build()); } else { e.printStackTrace(); }}}); }}Copy the code
MySubscriber
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MySubscriber {

    private Object subcriber;

    private Method subcriberMethods;

    private Boolean disable;
}
Copy the code
MyEventExceptionHandler
Public interface MyEventExceptionHandler {void handle(Throwable e, MyEventBusContext context); }Copy the code
ExecutorParam
@Data @Builder public class ExecutorParam { private Executor executor = DefaultExecutor.DEFAULT_EXECUTOR; private MyEventExceptionHandler handler; private MyRegister register; private MyBus bus; private Object event; private String topic; private static class DefaultExecutor implements Executor { private static final DefaultExecutor DEFAULT_EXECUTOR = new DefaultExecutor(); @Override public void execute(Runnable command) { command.run(); }}}Copy the code
Annotations MySubscribe
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MySubscribe {
    String topic() default "default_topic";
}
Copy the code
Constant
public class Constant {
    public final static String DEFAULT_BUS_NAME = "default";

    public final static String DEFAULT_TOPIC_NAME = "default_topic";
}
Copy the code