preface
Sorry. Due to various reasons, the next part of Okhttp and OKIO will be delayed. This week we will start with a simpler one.
EventBus is an observatory-based event publish/subscribe framework that allows developers to communicate between multiple modules with very little code, rather than having to build a separate communication bridge in the form of layer upon layer of delivery interfaces. This reduces the strong coupling between modules caused by multiple callbacks while avoiding the creation of a large number of inner classes. It can be used to communicate between activities, fragments, and background threads, avoiding complexity caused by intents or handlers. Its disadvantage is that it may cause interface bloat. Especially when the program requires a large number of different forms of notification without good abstraction, the code contains a large number of interfaces, and the increase in the number of interfaces leads to a whole host of problems with naming, comments, and so on. In essence, observers are required to realize the process of event generation, distribution and processing from scratch, which requires participants to have a good understanding of the whole notification process. This is a reasonable request when the program code is moderate, but becomes a burden when the program is too large.
EventBus An Android event distribution bus based on the observer mode.
Basic use of EventBus
1. Define the MessageEvent, that is, create the event type
public class MessageEvent { public final String message; public MessageEvent(String message) { this.message = message; }}Copy the code
2. Register observers and subscribe to events
Select the subscriber to subscribe to the event, the Activity joins at onCreate(), calls the Register method of EventBus, and registers.
EventBus.getDefault().register(this);
Copy the code
This is possible when no need to receive events occurs
EventBus.getDefault().unregister(this);
Copy the code
The annotation keyword @SUBSCRIBE is needed in the subscriber to tell EventBus what method to use to handle events.
@Subscribe
public void onMessageEvent(MessageEvent event) {
Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();
}
Copy the code
Note that methods can only be modified by public. After EventBus3.0, the method name can be used freely. Previously, only onEvent() was required.
3. Send events
Emit the event we want to pass through the EventBus POST method.
EventBus.getDefault().post(new MessageEvent("HelloEveryone"));
Copy the code
The selected Activity receives the event and fires the onMessageEvent method.
EventBus source code parsing
Understanding the basic use of EventBus, parsing, we call the process for its basic use, to understand the implementation process and source code details of EventBus.
Registered observer
EventBus.getDefault().register(this);
Copy the code
- GetDefault ()
Eventbus.getdefault () is a singleton implemented as follows:
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if(defaultInstance == null) { defaultInstance = new EventBus(); }}}return defaultInstance;
}
Copy the code
This ensures that there is only one EventBus instance in a single App process.
- register(Object subscriber)
public void register(Object subscriber) { Class<? > subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) {for(SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); }}}Copy the code
In the Register method, you first get the class of the subscription instance, then call the findSubscriberMethods method of the SubscriberMethodFinder instance to find the associated methods subscribed to in that class, and then call the subscription methods on those methods. The registration process involves two questions, one is how to find the registration method? The other is how do you store these methods for later calls?
How does the SubscriberMethodFinder find the associated registration methods from the instance?
List<SubscriberMethod> findSubscriberMethods(Class<? List<SubscriberMethod> subscriberMethods = method_cache. get(subscriberClass);if(subscriberMethods ! = null) {returnsubscriberMethods; } // find the registration methodif (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else{ subscriberMethods = findUsingInfo(subscriberClass); } // Add the resulting subscription method to the cacheif (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
returnsubscriberMethods; }}Copy the code
First of all, the method in the cache is searched by Class as the Key. If the search content is empty, the findUsingReflection or findUsingInfo will be called to find the list of registered methods from the related Class, and then add them to the cache.
The data structure of the cache is as follows:
Map<Class<? >, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();Copy the code
The subscribe method
Private void subscribe(Object subscriber, SubscriberMethod SubscriberMethod) {// Get the event type Class<? > eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); / / find the appropriate depending on the type of event subscribers CopyOnWriteArrayList < Subscription > subscriptions = subscriptionsByEventType. Get (eventType); // If the event type does not exist, it is created, and if the subscriber is already included, an exception is thrownif (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "+ eventType); Int size = subscriptions.size(); // Subscriptions.subscriptions.size (); // Subscriptions.subscriptions.size ();for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break; }} // Add the corresponding listener event type List<Class<? >> subscribedEvents = typesBySubscriber.get(subscriber);if(subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); // Sticky event handlingif (subscriberMethod.sticky) {
if(eventInheritance) { Set<Map.Entry<Class<? >, Object>> entries = stickyEvents.entrySet();for(Map.Entry<Class<? >, Object> entry : entries) { Class<? > candidateEventType = entry.getKey();if(eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}else{ Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}Copy the code
Subscribe method is executed according to the event type, judge whether the registrant has been registered, if not, save the method, save a copy with the event type as the key, and then save a copy with the registrant instance as the key.
Send the event
For sending events, the POST function is called
- post(Object event)
Public void post(Object event) {// Get the event queue of the current thread, And added to the queue PostingThreadState postingState = currentPostingThreadState. The get (); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); // If the current PostingThreadState is not in postif(! postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting =true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset"); } try {// traverse the event queue, calling the postSingleEvent methodwhile(! eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting =false;
postingState.isMainThread = false; }}}Copy the code
The POST method retrieves the current event queue from the current PostingThreadState, adds the post event to it, determines whether the current thread is in the POST, and if not, traverses the event queue. Call postSingleEvent to throw its events.
CurrentPostingThreadState is a ThreadLocal type, stored inside the PostingThreadState.
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
returnnew PostingThreadState(); }}Copy the code
PostingThreadState contains an eventQueue and flag bits. The concrete structure of the class is as follows.
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
Copy the code
- postSingleEvent
PostSingleEvent is implemented as follows.
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<? > eventClass = event.getClass(); boolean subscriptionFound =false;
if(eventInheritance) { List<Class<? >> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size();for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if(! subscriptionFound) {if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
Copy the code
Through lookupAllEventTypes (eventClass) to get the current eventClass Class, as well as the parent Class and interface of a Class type, then call postSingleEventForEventType method one by one. The core method provided by the events in the postSingleEventForEventType method.
- postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<? > eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); }if(subscriptions ! = null && ! subscriptions.isEmpty()) {for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break; }}return true;
}
return false;
}
Copy the code
Get the list of subscribers subscribed to eventClass from subscriptionsByEventType, iterate over it, call postToSubscription, and throw events one by one.
- postToSubscription
private void postToSubscription(Subscription subscription, Object event, Boolean isMainThread) {/ / different processing methods according to the subscriber's threading model switch (subscription. SubscriberMethod. ThreadMode) {case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if(mainThreadPoster ! = null) { mainThreadPoster.enqueue(subscription, event); }else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: "+ subscription.subscriberMethod.threadMode); }}Copy the code
ThreadMode determines which thread should execute the method, while invokeSubscriber calls the function through reflection.
MainThread
First to determine if the current is the UI thread, then directly call; Otherwise, mainThreadPoster. Enqueue (subscription, Event) BackgroundThread
If the current is not a UI thread, it is called directly; If it is a UI thread, the backgroundPoster.enqueue method is called.
Async
Call the asyncposter.enqueue method
The following analysis will be carried out for these broadcast modes
- invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e); }}Copy the code
The subscribe event method is called directly by reflection.
- mainThreadPoster.enqueue
mainThreadPoster = mainThreadSupport ! = null ? mainThreadSupport.createPoster(this) : null;Copy the code
MainThreadPoster through mainThreadSupport. CreatePoster created.
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
Copy the code
Return HandlerPoster instance.
The PendingPost is constructed using the Subscription and Event instances and added to the PendingPostQueue. SendMessage is called and its handleMessage function is called back.
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if(! handlerActive) { handlerActive =true;
if(! sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); }}}}Copy the code
Message processing
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if(! sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return; } } } finally { handlerActive = rescheduled; }}Copy the code
When the message is received, open the loop, fetch PendingPost from the queue, and call the invokeSubscriber method to execute it.
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if(subscription.active) { invokeSubscriber(subscription, event); }}Copy the code
ReleasePendingPost is called
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely if (pendingPostPool.size() < 10000) { pendingPostPool.add(pendingPost); }}}Copy the code
To avoid duplicate object creation, a PendingPost list is maintained in PendingPost to facilitate object reuse.
List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Copy the code
For object creation, this can be obtained through its obtainPendingPost method.
- asyncPoster.enqueue
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
Copy the code
Add PendingPost to the PendingPost queue, and the thread pool will fetch data from the queue and execute.
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
Copy the code
- backgroundPoster.enqueue
In contrast to asyncPoster, backgroundPoster ensures that the data added is executed sequentially, by means of synchronous locks and semaphores, and that only one thread is actively fetching events from the event queue and executing them.
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if(! executorRunning) { executorRunning =true; eventBus.getExecutorService().execute(this); }}}Copy the code
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
}
} finally {
executorRunning = false; }}Copy the code
Function of scanning
In the register method, the method called is the findSubscriberMethods method of SubscriberMethodFinder. There are two ways to find the methods: findUsingInfo. One is findUsingReflection.
private List<SubscriberMethod> findUsingReflection(Class<? FindState = prepareFindState(); findState.initForSubscriber(subscriberClass); // Search from the current class, then jump to its parent class and continue to find the corresponding methodwhile(findState.clazz ! = null) { findUsingReflectionInSingleClass(findState); findState.moveToSuperclass(); }return getMethodsAndRelease(findState);
}
Copy the code
First, we get an instance of FindState, which holds some intermediate variables and the final result of the search, first looks for the registration method in the current class, then jumps to its parent class, which automatically filters out the corresponding Java and Android classes, and then continues the search.
Find the core of the implementation in findUsingReflectionInSingleClass method.
private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; Try {/ / get all the methods in this class, not including inherited methods. The methods = findState clazz. GetDeclaredMethods (); } catch (Throwable th) { methods = findState.clazz.getMethods(); findState.skipSuperClasses =true; } // Check whether the added rule is public and has only one argument, get its annotation, and call checkAdd before adding to the subscribed methodfor (Method method : methods) {
int modifiers = method.getModifiers();
if((modifiers & Modifier.PUBLIC) ! = 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<? >[] parameterTypes = method.getParameterTypes();if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if(subscribeAnnotation ! = null) { Class<? > eventType = parameterTypes[0];if(findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); }}}else if(strictMethodVerification && method. IsAnnotationPresent (Subscribe. Class)) {/ / more than one parameter}}else if(strictMethodVerification && method. IsAnnotationPresent (Subscribe. Class)) {/ / the public, the abstract, non static}}}Copy the code
Scan for functions in the class according to the following scan rules: 1. Non-static function, abstract function 2. The function is public; 3. The function has only one parameter; 4. The function has an @Subscribe annotation;
After the above rules are met, it is not possible to add a function directly to the queue, and the method needs to be validated.
boolean checkAdd(Method method, Class<? > eventType) { Object existing = anyMethodByEventType.put(eventType, method);if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if(! checkAddWithMethodSignature((Method) existing, eventType)) { throw new IllegalStateException(); } anyMethodByEventType.put(eventType, this); }returncheckAddWithMethodSignature(method, eventType); }} / / function signature verification, to private Boolean checkAddWithMethodSignature (Method Method, Class <? > eventType) { methodKeyBuilder.setLength(0); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); Class<? > methodClass = method.getDeclaringClass(); Class<? > methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false; }}Copy the code
Verifies the scanned functions and releases its own resources after verification. In the checkAdd function, if no function is currently listening for the current event, the second layer check is skipped. The second check for full function signature check, the function name and monitor event class names joining together as a function signature, if does not exist in the current subscriberClassByMethodKey methodKey, same returns true, check the end; If the same methodKey exists, the subclass overrides the listener of the parent class. In this case, the subclass’s listener should be retained and the parent class ignored. Because the scan is sequential from subclass to parent, methodClassOld should be retained and methodClass ignored.
The above method is implemented through annotation processing at runtime, which is relatively slow. In the latest version of EventBus, annotation processor is introduced in the compiler and method index is generated in the compiler to improve efficiency.
Viscous event processing
Sticky events are designed to be stored by EventBus before the observer is registered and sent after the observer is registered. Through an internal data structure:
Map<Class<? >, Object> stickyEventsCopy the code
Save the last post Event for each Event type
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
Copy the code
Save the sticky event in stickyEvents and post it. If there are already registered observers, the situation is the same as normal events; If there are no registered observers, the time is converted into a NoSubscriberEvent event emitted in the postSingleEvent function, which can be consumed and processed by EventBus. When the observer is registered, the event is taken out of stickyEvents and redistributed to the registered observer.
if (subscriberMethod.sticky) {
if(eventInheritance) { Set<Map.Entry<Class<? >, Object>> entries = stickyEvents.entrySet();for(Map.Entry<Class<? >, Object> entry : entries) { Class<? > candidateEventType = entry.getKey();if(eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}}else{ Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); }}Copy the code
In for viscous Event handling this code, first determines whether the monitored Event subclasses, and then call checkPostStickyEventToSubscription will sticky events, in checkPostStickyEventToSubscription, After a short call, the event is passed to the observer according to the POST process of half the event.
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
Copy the code
summary
The wheels weekly, now in its fourth week, will be followed next week by a more detailed dissection of OkHttp and then OkIO