1. The EventBus usage

// Register for EventBus
 EventBus.getDefault().register(this);
 
// Deregistration of EventBus
EventBus.getDefault().unregister(this);

// Send events
EventBus.getDefault().post(new Object());

// Receive events
@Subscribe(threadMode = ThreadMode.MAIN)
public void onOtherDisplay(Object obj) {}Copy the code

2. Instantiate EventBus

    public static EventBus getDefault(a) {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = newEventBus(); }}}return defaultInstance;
    }

    EventBus(EventBusBuilder builder) {
      ......
      
        subscriptionsByEventType = new HashMap<>();
        typesBySubscriber = new HashMap<>();
        stickyEvents = newConcurrentHashMap<>(); mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport ! =null ? mainThreadSupport.createPoster(this) : null;
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this); indexCount = builder.subscriberInfoIndexes ! =null ? builder.subscriberInfoIndexes.size() : 0;
        subscriberMethodFinder = newSubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); . executorService = builder.executorService; }Copy the code
  • The singleton pattern of double checklock is used to obtain an instance of EventBus
  • **subscriptionsByEventType: **HashMap< Event type, Subscription collection >
  • **typesBySubscriber: **HashMap< The registration class for storing keys, the HashMap of the event type set > to determine whether an object has been registered and prevent repeated registration
  • **mainThreadPoster: ** Queues events with thread models MAIN and Main_ordere
  • **backgroundPoster: ** Adds events whose thread model is background to the queue
  • **asyncPoster: ** Adds thread model ASYNC events to the queue

3. Registration of EventBus

  • Find the subscription method of the registered class in the cache, return it directly, find all subscription methods in the cache, put it in the cache
  • Get all the methods in the registered class through reflection, loop through these methods, get the annotations on the methods, and get the event type and thread mode as the first parameter of the method. Encapsulate the method, event model, thread model, priority and whether it is stickiness time into the SubscriberMethod object. It is then added to subscriberMethods in the FindState class
  • Encapsulate registered classes and Subscription methods into Subscription, ordering subscriptions according to the priority of events. This will then be de-registered
  • subscriptionByEventType: Get eventType eventType, search subscriptionByEventType through eventType to find whether there is a SubScrition set, create and add subscriptionByEventType if there is not
  • TypesBySubscriber: A HashMap whose storage key is a registration class (corresponding to EventBusFirstActivity in the example) and value is a collection of event types
  • If the stickiness event is, it is taken out from stickEvents and processed. According to the thread mode, whether the thread needs to be switched is determined. If not, the subscription method is directly called through reflection. If necessary, switch to the specified thread through Handler or thread pool
    public void register(Object subscriber) { Class<? > subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);// See [1]
        synchronized (this) {
          // See [2]
            for(SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); }}}Copy the code
【 1 】 findSubscriberMethods ()
  private static finalMap<Class<? >, List<SubscriberMethod>> METHOD_CACHE =new ConcurrentHashMap<>();
    private final boolean ignoreGeneratedIndex;
    
   List<SubscriberMethod> findSubscriberMethods(Class
        subscriberClass) {
     // Check METHOD_CACHE to see if subscribed methods on the registered class are already cached
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if(subscriberMethods ! =null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
          // Find all subscription methods
            subscriberMethods = findUsingInfo(subscriberClass);// See [3]
        } 
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
          // If all subscription methods are found, they are cached in the METHOD_CACHE collection
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            returnsubscriberMethods; }}Copy the code

Conclusion:

    1. Finds the subscription method of the registered class in the cache, which is present in the cache, and returns directly
    1. None in the cache. Search for all subscription methods and store them in the cache
[3] findUsingInfo ()
    private List<SubscriberMethod> findUsingInfo(Class
        subscriberClass) {
       // Use FindState to assist in looking up subscription methods
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while(findState.clazz ! =null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            if(findState.subscriberInfo ! =null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if(findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); }}}else {
            // Get all the methods of the registered class by reflection
                findUsingReflectionInSingleClass(findState);// See [5]
            }
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

Copy the code
[5] findUsingReflectionInSingleClass ()

    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
             Reflection gets all the methods in the registered class
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Reflection gets all methods in the registered class
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        // Loop through all methods
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            if((modifiers & Modifier.PUBLIC) ! =0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<? >[] parameterTypes = method.getParameterTypes();if (parameterTypes.length == 1) {
                // Get annotations on methods
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if(subscribeAnnotation ! =null) {
                 // Gets the first argument on the method, the event typeClass<? > eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                  // Get the thread mode
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                  // Encapsulate the method, event model, thread model, priority, stickiness time into the SubscriberMethod object and add it to subscriberMethods in the FindState class
                            findState.subscriberMethods.add(newSubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); }}}... }}Copy the code

Conclusion:

Get all the methods in the registered class through reflection, loop through these methods, get the annotations on the methods, and get the event type and thread mode as the first parameter of the method. Encapsulate the method, event model, thread model, priority and whether it is stickiness time into the SubscriberMethod object. It is then added to subscriberMethods in the FindState class

【2】Subscribe ()
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    // Get the event typeClass<? > eventType = subscriberMethod.eventType;// Encapsulate registered classes and Subscription methods in Subscription
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    // Add subscriptionByEventType to subscriptionByEventType if there is no SubScrition set
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else{... }// Subscriptions are added according to the priority in newSubscription in the subscriptionclass
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break; }}// Assign typesBySubscriber to a HashMap that stores key as a registration class (corresponding to EventBusFirstActivity in the example) and value as a collection of event typesList<Class<? >> subscribedEvents = typesBySubscriber.get(subscriber);if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
   PostToSubscription (newSubscription, stickyEvent, isMainThread())) postToSubscription(newSubscription, stickyEvent, isMainThread())
        if (subscriberMethod.sticky) {
            if(eventInheritance) { Set<Map.Entry<Class<? >, Object>> entries = stickyEvents.entrySet();for(Map.Entry<Class<? >, Object> entry : entries) { Class<? > candidateEventType = entry.getKey();if(eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}else{ Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}Copy the code

Conclusion:

  • Encapsulate registered classes and Subscription methods in Subscription
  • Subscriptions based on the priorities of events
  • subscriptionByEventType: Get eventType eventType, search subscriptionByEventType through eventType to find whether there is a SubScrition set, create and add subscriptionByEventType if there is not
  • TypesBySubscriber: A HashMap whose storage key is a registration class (corresponding to EventBusFirstActivity in the example) and value is a collection of event types
  • If it’s a sticky event, take it out of stickEvents and process it

4. Deregister EventBus

All subscription method information is stored in the subsciptionByEventType collection and all event types are stored in the typesBySubscriber collection at registration time. Unbinding removes the contents saved in both collections

 public synchronized void unregister(Object subscriber) {
      // Gets the event type stored in the typesBySubscriber collection based on the registered classList<Class<? >> subscribedTypes = typesBySubscriber.get(subscriber);if(subscribedTypes ! =null) {
            for(Class<? > eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType);// See [6]
            }
          // Remove the set of event types saved in typeBySubcriber
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: "+ subscriber.getClass()); }}Copy the code
[6] unsubscribeByEventType(subscriber, eventType)
private void unsubscribeByEventType(Object subscriber, Class
        eventType) {
      // Get the Subscription collection saved in the subScriptionByEventType collection based on the event type
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if(subscriptions ! =null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                  // Remove from subscription collectionsubscriptions.remove(i); i--; size--; }}}}Copy the code
5. Send the EVENT POST
  • Basically an event is taken from the event queue and sent. All event types, including the current event, events in the parent class and interfaces, are sent the NoSubscriberEvent () event if no subscriber is found

Extract all subscription methods from subscriptionsByEventType set, and then determine whether to switch threads according to thread mode, if not, call subscription methods directly through reflection; If necessary, switch to the specified thread through Handler or thread pool.

   private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue(a) {
            return newPostingThreadState(); }};public void post(Object event) {
  / / currentPostingThreadState is a ThreadLocal object, which stores the PostingThreadState object,
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if(! postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting =true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while(! eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); // See [7]}}finally {
                postingState.isPosting = false;
                postingState.isMainThread = false; }}}Copy the code
[7] postSingleEvent ()
  private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<? > eventClass = event.getClass();// Whether a subscriber is found
        boolean subscriptionFound = false;
        // (1) indicates whether events in the parent class and interface need to be sent
        if (eventInheritance) {
            // (2) Find all event types, including the current event, events in the parent class, and events in the interface.List<Class<? >> eventTypes = lookupAllEventTypes(eventClass);int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) { Class<? > clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);// See [8]}}else {
            // Only events of the currently registered class are sent based on the event type, ignoring parent classes and interfaces.
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if(! subscriptionFound) {if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if(sendNoSubscriberEvent && eventClass ! = NoSubscriberEvent.class && eventClass ! = SubscriberExceptionEvent.class) {// if no subscriber is found, send a NoSubscriberEvent event
                post(new NoSubscriberEvent(this, event)); }}}Copy the code
[8] postSingleEventForEventType ()
  private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class
        eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
          // Retrieve the SubScription from the subscriptionsByEventType collection that was saved when it was registered, and iterate through the collection to retrieve the SubScription
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if(subscriptions ! =null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread); // See [9]
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break; }}return true;
        }
        return false;
    }


Copy the code
[9] postToSubscription ()
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event); // See [10]
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if(mainThreadPoster ! =null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: "+ subscription.subscriberMethod.threadMode); }}Copy the code

Depending on the thread model, which thread executes the subscription method

[10] invokeSubscriber ()
 void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e); }}Copy the code

Called directly through reflection

Main

If thread mode is MAIN, events are sent on the MAIN thread and the subscription method is executed on the MAIN thread. Otherwise, the event is enqueued, then the main thread is switched to by Handler and the invokeSubscriber () method is executed

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if(! handlerActive) { handlerActive =true;
                if(! sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message"); }}}}@Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                       
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if(! sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return; }}}finally{ handlerActive = rescheduled; }}}Copy the code

MAIN_ORDERED

If thread mode is MAIN_ORDERED, events will be queued in whichever thread sends them, and then switched to the main thread by Handler for execution

BACKGROUND

The Subscription and event are wrapped into a PendingPost object, which is then added to the queue. The difference here is that instead of sending messages using a Handler, they are executed through a thread pool. So, if the thread mode is BACKGROUND, then the event is sent in the child thread, then the subscription method is executed in the child thread, otherwise the event is queued and then executed through the thread pool

final class BackgroundPoster implements Runnable.Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if(! executorRunning) { executorRunning =true;
                eventBus.getExecutorService().execute(this); }}}@Override
    public void run(a) {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                         
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return; } } } eventBus.invokeSubscriber(pendingPost); }}catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e); }}finally {
            executorRunning = false; }}}Copy the code

ASYNC

Regardless of which thread sends an event, the event is first queued and then executed through the thread pool.

class AsyncPoster implements Runnable.Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run(a) {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); }}Copy the code