preface

Sorry. Due to various reasons, the next part of Okhttp and OKIO will be delayed. This week we will start with a simpler one.

EventBus is an observatory-based event publish/subscribe framework that allows developers to communicate between multiple modules with very little code, rather than having to build a separate communication bridge in the form of layer upon layer of delivery interfaces. This reduces the strong coupling between modules caused by multiple callbacks while avoiding the creation of a large number of inner classes. It can be used to communicate between activities, fragments, and background threads, avoiding complexity caused by intents or handlers. Its disadvantage is that it may cause interface bloat. Especially when the program requires a large number of different forms of notification without good abstraction, the code contains a large number of interfaces, and the increase in the number of interfaces leads to a whole host of problems with naming, comments, and so on. In essence, observers are required to realize the process of event generation, distribution and processing from scratch, which requires participants to have a good understanding of the whole notification process. This is a reasonable request when the program code is moderate, but becomes a burden when the program is too large.

EventBus An Android event distribution bus based on the observer mode.

1460868463660451.png

Basic use of EventBus

1. Define the MessageEvent, that is, create the event type

public class MessageEvent { public final String message; public MessageEvent(String message) { this.message = message; }}Copy the code

2. Register observers and subscribe to events

Select the subscriber to subscribe to the event, the Activity joins at onCreate(), calls the Register method of EventBus, and registers.

 EventBus.getDefault().register(this);

Copy the code

This is possible when no need to receive events occurs

EventBus.getDefault().unregister(this);
Copy the code

The annotation keyword @SUBSCRIBE is needed in the subscriber to tell EventBus what method to use to handle events.

@Subscribe
public void onMessageEvent(MessageEvent event) {
    Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();
}
Copy the code

Note that methods can only be modified by public. After EventBus3.0, the method name can be used freely. Previously, only onEvent() was required.

3. Send events

Emit the event we want to pass through the EventBus POST method.

EventBus.getDefault().post(new MessageEvent("HelloEveryone"));

Copy the code

The selected Activity receives the event and fires the onMessageEvent method.

EventBus source code parsing

Understanding the basic use of EventBus, parsing, we call the process for its basic use, to understand the implementation process and source code details of EventBus.

Registered observer

EventBus.getDefault().register(this);
Copy the code
  • GetDefault ()

Eventbus.getdefault () is a singleton implemented as follows:

public static EventBus getDefault() {  
   if (defaultInstance == null) {  
       synchronized (EventBus.class) {  
           if(defaultInstance == null) { defaultInstance = new EventBus(); }}}return defaultInstance;  
} 
Copy the code

This ensures that there is only one EventBus instance in a single App process.

  • register(Object subscriber)
public void register(Object subscriber) { Class<? > subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) {for(SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); }}}Copy the code

In the Register method, you first get the class of the subscription instance, then call the findSubscriberMethods method of the SubscriberMethodFinder instance to find the associated methods subscribed to in that class, and then call the subscription methods on those methods. The registration process involves two questions, one is how to find the registration method? The other is how do you store these methods for later calls?

How does the SubscriberMethodFinder find the associated registration methods from the instance?

List<SubscriberMethod> findSubscriberMethods(Class<? List<SubscriberMethod> subscriberMethods = method_cache. get(subscriberClass);if(subscriberMethods ! = null) {returnsubscriberMethods; } // find the registration methodif (ignoreGeneratedIndex) {
        subscriberMethods = findUsingReflection(subscriberClass);
    } else{ subscriberMethods = findUsingInfo(subscriberClass); } // Add the resulting subscription method to the cacheif (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        returnsubscriberMethods; }}Copy the code

First of all, the method in the cache is searched by Class as the Key. If the search content is empty, the findUsingReflection or findUsingInfo will be called to find the list of registered methods from the related Class, and then add them to the cache.

The data structure of the cache is as follows:

Map<Class<? >, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();Copy the code

The subscribe method

Private void subscribe(Object subscriber, SubscriberMethod SubscriberMethod) {// Get the event type Class<? > eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); / / find the appropriate depending on the type of event subscribers CopyOnWriteArrayList < Subscription > subscriptions = subscriptionsByEventType. Get (eventType); // If the event type does not exist, it is created, and if the subscriber is already included, an exception is thrownif (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "+ eventType); Int size = subscriptions.size(); // Subscriptions.subscriptions.size (); // Subscriptions.subscriptions.size ();for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break; }} // Add the corresponding listener event type List<Class<? >> subscribedEvents = typesBySubscriber.get(subscriber);if(subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); // Sticky event handlingif (subscriberMethod.sticky) {
        if(eventInheritance) { Set<Map.Entry<Class<? >, Object>> entries = stickyEvents.entrySet();for(Map.Entry<Class<? >, Object> entry : entries) { Class<? > candidateEventType = entry.getKey();if(eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}else{ Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}Copy the code

Subscribe method is executed according to the event type, judge whether the registrant has been registered, if not, save the method, save a copy with the event type as the key, and then save a copy with the registrant instance as the key.

Send the event

For sending events, the POST function is called

  • post(Object event)

Public void post(Object event) {// Get the event queue of the current thread, And added to the queue PostingThreadState postingState = currentPostingThreadState. The get (); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); // If the current PostingThreadState is not in postif(! postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting =true;
	    if (postingState.canceled) {
	        throw new EventBusException("Internal error. Abort state was not reset"); } try {// traverse the event queue, calling the postSingleEvent methodwhile(! eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting =false;
	        postingState.isMainThread = false; }}}Copy the code

The POST method retrieves the current event queue from the current PostingThreadState, adds the post event to it, determines whether the current thread is in the POST, and if not, traverses the event queue. Call postSingleEvent to throw its events.

CurrentPostingThreadState is a ThreadLocal type, stored inside the PostingThreadState.

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {  
       @Override  
       protected PostingThreadState initialValue() {  
           returnnew PostingThreadState(); }}Copy the code

PostingThreadState contains an eventQueue and flag bits. The concrete structure of the class is as follows.

final static class PostingThreadState {
    final List<Object> eventQueue = new ArrayList<>();
    boolean isPosting;
    boolean isMainThread;
    Subscription subscription;
    Object event;
    boolean canceled;
}
Copy the code
  • postSingleEvent

PostSingleEvent is implemented as follows.

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<? > eventClass = event.getClass(); boolean subscriptionFound =false;
    if(eventInheritance) { List<Class<? >> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size();for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if(! subscriptionFound) {if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}
Copy the code

Through lookupAllEventTypes (eventClass) to get the current eventClass Class, as well as the parent Class and interface of a Class type, then call postSingleEventForEventType method one by one. The core method provided by the events in the postSingleEventForEventType method.

  • postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<? > eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); }if(subscriptions ! = null && ! subscriptions.isEmpty()) {for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break; }}return true;
    }
    return false;
}
Copy the code

Get the list of subscribers subscribed to eventClass from subscriptionsByEventType, iterate over it, call postToSubscription, and throw events one by one.

  • postToSubscription
private void postToSubscription(Subscription subscription, Object event, Boolean isMainThread) {/ / different processing methods according to the subscriber's threading model switch (subscription. SubscriberMethod. ThreadMode) {case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if(mainThreadPoster ! = null) { mainThreadPoster.enqueue(subscription, event); }else {
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: "+ subscription.subscriberMethod.threadMode); }}Copy the code

ThreadMode determines which thread should execute the method, while invokeSubscriber calls the function through reflection.

MainThread

First to determine if the current is the UI thread, then directly call; Otherwise, mainThreadPoster. Enqueue (subscription, Event) BackgroundThread

If the current is not a UI thread, it is called directly; If it is a UI thread, the backgroundPoster.enqueue method is called.

Async

Call the asyncposter.enqueue method

The following analysis will be carried out for these broadcast modes

  • invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e); }}Copy the code

The subscribe event method is called directly by reflection.

  • mainThreadPoster.enqueue
mainThreadPoster = mainThreadSupport ! = null ? mainThreadSupport.createPoster(this) : null;Copy the code

MainThreadPoster through mainThreadSupport. CreatePoster created.

public Poster createPoster(EventBus eventBus) {
    return new HandlerPoster(eventBus, looper, 10);
}
Copy the code

Return HandlerPoster instance.

The PendingPost is constructed using the Subscription and Event instances and added to the PendingPostQueue. SendMessage is called and its handleMessage function is called back.

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if(! handlerActive) { handlerActive =true;
            if(! sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); }}}}Copy the code

Message processing

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // Check again, this time in synchronized
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            eventBus.invokeSubscriber(pendingPost);
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if(! sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return; } } } finally { handlerActive = rescheduled; }}Copy the code

When the message is received, open the loop, fetch PendingPost from the queue, and call the invokeSubscriber method to execute it.

void invokeSubscriber(PendingPost pendingPost) {
    Object event = pendingPost.event;
    Subscription subscription = pendingPost.subscription;
    PendingPost.releasePendingPost(pendingPost);
    if(subscription.active) { invokeSubscriber(subscription, event); }}Copy the code

ReleasePendingPost is called

static void releasePendingPost(PendingPost pendingPost) {
    pendingPost.event = null;
    pendingPost.subscription = null;
    pendingPost.next = null;
    synchronized (pendingPostPool) {
        // Don't let the pool grow indefinitely if (pendingPostPool.size() < 10000) { pendingPostPool.add(pendingPost); }}}Copy the code

To avoid duplicate object creation, a PendingPost list is maintained in PendingPost to facilitate object reuse.

List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Copy the code

For object creation, this can be obtained through its obtainPendingPost method.

  • asyncPoster.enqueue
public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    queue.enqueue(pendingPost);
    eventBus.getExecutorService().execute(this);
}
Copy the code

Add PendingPost to the PendingPost queue, and the thread pool will fetch data from the queue and execute.

@Override
public void run() {
    PendingPost pendingPost = queue.poll();
    if(pendingPost == null) {
        throw new IllegalStateException("No pending post available");
    }
    eventBus.invokeSubscriber(pendingPost);
}
Copy the code
  • backgroundPoster.enqueue

In contrast to asyncPoster, backgroundPoster ensures that the data added is executed sequentially, by means of synchronous locks and semaphores, and that only one thread is actively fetching events from the event queue and executing them.

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if(! executorRunning) { executorRunning =true; eventBus.getExecutorService().execute(this); }}}Copy the code
public void run() {
    try {
        try {
            while (true) {
                PendingPost pendingPost = queue.poll(1000);
                if (pendingPost == null) {
                    synchronized (this) {
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            executorRunning = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
            }
        } catch (InterruptedException e) {
            
        }
    } finally {
        executorRunning = false; }}Copy the code

Function of scanning

In the register method, the method called is the findSubscriberMethods method of SubscriberMethodFinder. There are two ways to find the methods: findUsingInfo. One is findUsingReflection.

private List<SubscriberMethod> findUsingReflection(Class<? FindState = prepareFindState(); findState.initForSubscriber(subscriberClass); // Search from the current class, then jump to its parent class and continue to find the corresponding methodwhile(findState.clazz ! = null) { findUsingReflectionInSingleClass(findState); findState.moveToSuperclass(); }return getMethodsAndRelease(findState);
}
Copy the code

First, we get an instance of FindState, which holds some intermediate variables and the final result of the search, first looks for the registration method in the current class, then jumps to its parent class, which automatically filters out the corresponding Java and Android classes, and then continues the search.

Find the core of the implementation in findUsingReflectionInSingleClass method.

private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; Try {/ / get all the methods in this class, not including inherited methods. The methods = findState clazz. GetDeclaredMethods (); } catch (Throwable th) { methods = findState.clazz.getMethods(); findState.skipSuperClasses =true; } // Check whether the added rule is public and has only one argument, get its annotation, and call checkAdd before adding to the subscribed methodfor (Method method : methods) {
        int modifiers = method.getModifiers();
        if((modifiers & Modifier.PUBLIC) ! = 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<? >[] parameterTypes = method.getParameterTypes();if (parameterTypes.length == 1) {
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if(subscribeAnnotation ! = null) { Class<? > eventType = parameterTypes[0];if(findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); }}}else if(strictMethodVerification && method. IsAnnotationPresent (Subscribe. Class)) {/ / more than one parameter}}else if(strictMethodVerification && method. IsAnnotationPresent (Subscribe. Class)) {/ / the public, the abstract, non static}}}Copy the code

Scan for functions in the class according to the following scan rules: 1. Non-static function, abstract function 2. The function is public; 3. The function has only one parameter; 4. The function has an @Subscribe annotation;

After the above rules are met, it is not possible to add a function directly to the queue, and the method needs to be validated.

boolean checkAdd(Method method, Class<? > eventType) { Object existing = anyMethodByEventType.put(eventType, method);if (existing == null) {
        return true;
    } else {
        if (existing instanceof Method) {
            if(! checkAddWithMethodSignature((Method) existing, eventType)) { throw new IllegalStateException(); } anyMethodByEventType.put(eventType, this); }returncheckAddWithMethodSignature(method, eventType); }} / / function signature verification, to private Boolean checkAddWithMethodSignature (Method Method, Class <? > eventType) { methodKeyBuilder.setLength(0); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); Class<? > methodClass = method.getDeclaringClass(); Class<? > methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
        // Only add if not already found in a sub class
        return true;
    } else {
        // Revert the put, old class is further down the class hierarchy
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false; }}Copy the code

Verifies the scanned functions and releases its own resources after verification. In the checkAdd function, if no function is currently listening for the current event, the second layer check is skipped. The second check for full function signature check, the function name and monitor event class names joining together as a function signature, if does not exist in the current subscriberClassByMethodKey methodKey, same returns true, check the end; If the same methodKey exists, the subclass overrides the listener of the parent class. In this case, the subclass’s listener should be retained and the parent class ignored. Because the scan is sequential from subclass to parent, methodClassOld should be retained and methodClass ignored.

The above method is implemented through annotation processing at runtime, which is relatively slow. In the latest version of EventBus, annotation processor is introduced in the compiler and method index is generated in the compiler to improve efficiency.

Viscous event processing

Sticky events are designed to be stored by EventBus before the observer is registered and sent after the observer is registered. Through an internal data structure:

Map<Class<? >, Object> stickyEventsCopy the code

Save the last post Event for each Event type

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}
Copy the code

Save the sticky event in stickyEvents and post it. If there are already registered observers, the situation is the same as normal events; If there are no registered observers, the time is converted into a NoSubscriberEvent event emitted in the postSingleEvent function, which can be consumed and processed by EventBus. When the observer is registered, the event is taken out of stickyEvents and redistributed to the registered observer.

if (subscriberMethod.sticky) {
    if(eventInheritance) { Set<Map.Entry<Class<? >, Object>> entries = stickyEvents.entrySet();for(Map.Entry<Class<? >, Object> entry : entries) { Class<? > candidateEventType = entry.getKey();if(eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}else{ Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}Copy the code

In for viscous Event handling this code, first determines whether the monitored Event subclasses, and then call checkPostStickyEventToSubscription will sticky events, in checkPostStickyEventToSubscription, After a short call, the event is passed to the observer according to the POST process of half the event.

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    if (stickyEvent != null) {	       
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}
Copy the code

summary

The wheels weekly, now in its fourth week, will be followed next week by a more detailed dissection of OkHttp and then OkIO