When writing Android or Java in-process applications, Guava’s EventBus is usually selected to send messages, so as to achieve loose code coupling and business decoupling. Today half smoke simple analysis of the message bus principle, and then write a simple message bus. To put it bluntly, the message bus is a typical application of the observer pattern.

General use of EventBus

Flowchart and general principles

The publisher (or observed) sends an Event message, and EventBus finds the appropriate subscriber (or observer) based on the Subcriber annotation and the Event Event, and then executes the subscriber’s response method via Java reflection.

Using the step

In general, we use the following steps in the project, the specific use of friends can Google:

  1. Define an event class

    public class AnyEventType {
         public AnyEventType(a){}}Copy the code
  2. Registered observer

    EventBus.getDefault().register(this);
    Copy the code
  3. Method of subscribing to an observer

    @Subscribe
    public void onEvent(AnyEventType event) {/* Do something */};
    Copy the code
  4. The observed sends a message

    EventBus.getDefault().post(event);
    Copy the code

Analyze EventBus& Design core classes

Now that I’ve covered the basic uses of EventBus, I’ll look at the core classes of EventBus.

From the above usage, EventBus has two core methods, register() and post(). Register () is a registered subscriber, and post() is a publisher sending a message. Now you understand the heart of EventBus. Here are two diagrams to illustrate the two core methods:

As you can see from the figure, the most critical data structure is the Observer registry, which records the correspondence between message types and acceptable functions. When the register() function is called to register an Observer, EventBus generates the Observer registry by parsing the @SUBSCRIBE annotation. When the POST () function is called to send a message, EventBus finds the corresponding function to receive the message through the registry, and then dynamically creates the object and executes the function using Java’s reflection syntax. For synchronous blocking, EventBus executes the corresponding functions sequentially within a thread. For asynchronous non-blocking mode, EventBus executes the corresponding function through a thread pool.

Understand the principle, began to code implementation. The implementation consists of five classes: EventBus, AsyncEventBus, Subscribe, ObserverAction, ObserverRegistry.

Code implementation

Start implementing five core classes: EventBus, AsyncEventBus, Subcribe, ObserverAction, ObserverRegistry.

1.Subscribe

Subscribe is a comment that identifies which observer methods can accept messages.

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {}
Copy the code
2.ObserverAction

The methods used by the ObserverAction class to execute the observer, the @SUBSCRIBE annotated methods, are registered uniformly in the ObserverRegistry class, target represents the observer class, and Method represents the method being executed.

public class ObserverAction {

    private Object target;
    private Method method;

    public ObserverAction(Object target, Method method) {
        this.target = target;
        this.method = method;
        this.method.setAccessible(true);
    }

    public void execute(Object event) {
        try {
            method.invoke(target, event);
        } catch(IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); }}}Copy the code
3.ObserverRegistry

The ObserverRegistry class is used to register observers and uses a lot of Java reflection. The core logic includes:

1. Register @SUBSCRIBE events and methods into map

2, provide to find the corresponding class method according to the event

To avoid concurrency collisions, ConcurrentHashMap and CopyOnWriteArraySet are used. When registering a single observer, the events and methods of the observer are first retrieved and then put into the global concurrent container. This is logical and avoids concurrency conflicts.

public class ObserverRegistry {

    privateConcurrentHashMap<Class<? >, CopyOnWriteArraySet<ObserverAction>> registry =new ConcurrentHashMap<>();

    /** * Register */
    public void register(Object observer) {
        // Iterate over annotated methods, storing events and their corresponding handlers in a mapMap<Class<? >, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);// Put the executable methods of a single observer into a global map, using a concurrent class
        for(Map.Entry<Class<? >, Collection<ObserverAction>> entry : observerActions.entrySet()) { Class<? > eventType = entry.getKey(); Collection<ObserverAction> eventActions = entry.getValue(); CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);if (registeredEventActions == null) {
                registry.putIfAbsent(eventType, newCopyOnWriteArraySet<>()); registeredEventActions = registry.get(eventType); } registeredEventActions.addAll(eventActions); }}/** * iterates through annotated methods, storing events and their corresponding handlers in a map **@param observer
     * @return* /
    privateMap<Class<? >, Collection<ObserverAction>> findAllObserverActions(Object observer) { Class<? > clazz = observer.getClass(); List<Method> methodList = getAnnotateMethods(clazz); Map<Class<? >, Collection<ObserverAction>> observerActions =new HashMap<>();
        for(Method method : methodList) { Class<? >[] parameterTypes = method.getParameterTypes(); Class<? > eventType = parameterTypes[0];
            if(! observerActions.containsKey(eventType)) { observerActions.put(eventType,new ArrayList<>());
            }
            observerActions.get(eventType).add(new ObserverAction(observer, method));
        }
        return observerActions;
    }

    /** * The method to get the observer to contain annotations **@param clazz
     * @return* /
    private List<Method> getAnnotateMethods(Class
        clazz) {
        List<Method> annotateMethods = new ArrayList<>();
        for (Method method : clazz.getDeclaredMethods()) {
            if (method.isAnnotationPresent(Subscribe.class)) {
// Class
      [] parameterTypes = method.getParameterTypes();annotateMethods.add(method); }}return annotateMethods;
    }

    /** * Get the appropriate observer method ** based on the event@param event
     * @return* /
    public List<ObserverAction> getMatchedObserverActions(Object event) {
        List<ObserverAction> matchedObservers = newArrayList<>(); Class<? > postedEventType = event.getClass();for(Map.Entry<Class<? >, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) { Class<? > eventType = entry.getKey(); Collection<ObserverAction> eventActions = entry.getValue();// Determine if an event with an input is a subclass of time in the container, i.e. whether a class can be cast to another instance
            // Both parent and child classes are judged to be true
            if(eventType.isAssignableFrom(postedEventType)) { matchedObservers.addAll(eventActions); }}returnmatchedObservers; }}Copy the code
4.EventBus

The message bus entry method, which provides a register to register observers, also provides a POST for the observed to send messages.

public class EventBus {

    private ObserverRegistry registry = new ObserverRegistry();

    private Executor executor;

    public EventBus(a) {}public EventBus(Executor executor) {
        this.executor = executor;
    }

    /** * Register observer */
    public void register(Object observer) {
        registry.register(observer);
    }

    /** * publishers - send messages */
    public void post(Object event) {
        List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
        for (ObserverAction observerAction : observerActions) {
            if (executor == null) {
                observerAction.execute(event);
            } else{ executor.execute(() -> { observerAction.execute(event); }); }}}}Copy the code
5.AsyncEventBus

Asynchronous message bus entry method

public class AsyncEventBus extends EventBus {
    public AsyncEventBus(Executor executor) {
        super(executor); }}Copy the code
Sort code

The complete code

Gitee.com/yclxiao/spe…

conclusion

In general, EventBus is a message bus implemented based on the Observer pattern, decoupling code from business.

The purpose of the framework is to hide implementation details, ease development, reuse code, decouple business code, and let developers focus on business development.

The original link