preface

At present, EventBus and RxBus are still commonly used in the market. I once wrote source code analysis of EventBus in the early days. No matter which of these two frameworks, developers need to consider the life cycle processing. Meituan provides a solution, through LiveData to realize its own lifecycle awareness of the event bus framework. This is our own event bus framework.

The principle of LiveData

When we use LiveData as an event bus, we always need to know what it is and why we can use it to implement an event bus.

LiveData can observe data and has LifeCycle awareness, which means that LiveData will only perform observation actions when the LifeCycle is in an inActive state and its ability to do so cannot be removed from LifeCycle.

First of all, we can take a look at the UML diagram of LiveData to get a general understanding of it

LiveData
mVersion
ObserverWrapper
mLastVersion

To subscribe to

There is a mObservers inside LiveData that holds all the observers of the binding. We can subscribe via LiveData# Observe and LiveData#oberveForever. If we need to bind to the lifecycle, we need to pass in the LifecycleOwner object, register our LiveData data Observer wrapper with the lifecycle Observer to receive changes to the lifecycle and make timely updates. We can take a look at the LiveData subscription method code

@MainThread
   public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
       assertMainThread("observe");
       // The current subscription request will be ignored when the currently bound component is in a DESTROYED state
       if (owner.getLifecycle().getCurrentState() == DESTROYED) {
           return;
       }
       // Convert to an observer wrapper class with lifecycle awareness
       LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
       ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
       // The corresponding observer can be bound to only one owner
       if(existing ! =null && !existing.isAttachedTo(owner)) {
           throw new IllegalArgumentException("Cannot add the same observer"
                   + " with different lifecycles");
       }
       if(existing ! =null) {
           return;
       }
       Registration / / lifecycle
       owner.getLifecycle().addObserver(wrapper);
   }
Copy the code

For the observer we need to monitor the lifecycle, LiveData wraps it as a LifecycleBoundObserver object, which inherits from ObserverWrapper, and ultimately implements the GenericLifecycleObserver interface, The lifecycle state change event is obtained by implementing the GenericLifecycleObserver#onStateChanged method.

Send a message

The difference between LiveData#setValue and LiveData#postValue is that one sends a message in the main thread, while the other sends a message in the child thread. Post eventually calls setValue through the Handler that specifies the main thread. So I’m going to focus on LiveData#setValue

@MainThread
    protected void setValue(T value) {
        assertMainThread("setValue");
        // Send version +1
        mVersion++;
        mData = value;
        // Information distribution
        dispatchingValue(null);
    }
Copy the code

When setValue is called, it is equivalent to the change of observable data maintained inside LiveData, which directly triggers event distribution

void dispatchingValue(@Nullable ObserverWrapper initiator) {
        // mDispatchingValue is used to handle concurrent calls to dispatchingValue
        // If new data changes occur during execution, the observer will not be notified again
        // So the execution within the observer should not do time-consuming work
        if (mDispatchingValue) {
            mDispatchInvalidated = true;
            return;
        }
        mDispatchingValue = true;
        do {
            mDispatchInvalidated = false;
            if(initiator ! =null) {
                considerNotify(initiator);
                initiator = null;
            } else {
                for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
                        mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                    considerNotify(iterator.next().getValue());
                    if (mDispatchInvalidated) {
                        break; }}}}while (mDispatchInvalidated);
        mDispatchingValue = false;
    }
Copy the code

Eventually, we’ll come to the notice method, and let the observer be active and his subscriptions be considered less than the number of sendings, finally triggering the observation method we’ve implemented.

private void considerNotify(ObserverWrapper observer) {
        if(! observer.mActive) {return;
        }
        if(! observer.shouldBeActive()) { observer.activeStateChanged(false);
            return;
        }
        if (observer.mLastVersion >= mVersion) {
            return;
        }
        observer.mLastVersion = mVersion;
        //noinspection unchecked
        observer.mObserver.onChanged((T) mData);
    }
Copy the code

Note that LiveData#dispatchingValue is not only triggered when we actively update data, but also notified when our observer status changes (inactive->active), thus LiveData must support sticky events

class LifecycleBoundObserver extends ObserverWrapper implements GenericLifecycleObserver {
  @Override
       public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) {
           if (mOwner.getLifecycle().getCurrentState() == DESTROYED) {
               removeObserver(mObserver);
               return; } activeStateChanged(shouldBeActive()); }}private abstract class ObserverWrapper {
  void activeStateChanged(boolean newActive) {
            if (newActive == mActive) {
                return;
            }
            // When the observer status changes from active->inactive, or inactive-> Active, follow the following process
            mActive = newActive;
            boolean wasInactive = LiveData.this.mActiveCount == 0;
            LiveData.this.mActiveCount += mActive ? 1 : -1;
            if (wasInactive && mActive) {
                onActive();
            }
            if (LiveData.this.mActiveCount == 0 && !mActive) {
              // The onInactive null method is triggered when the current liveData observer is inactive and the current observer is active->inactive
              // We can overwrite onInactive to determine if all liveData observers fail, such as freeing some large memory objects
                onInactive();
            }
            // When the observer is from inactive->active
            // The observer needs to be notified
            if (mActive) {
                dispatchingValue(this); }}}Copy the code

The principle of summary

In summary, we can know as follows about LiveData:

  1. LiveDataThe observer may or may not be associated with the lifecycle
  2. LiveDataThe observer can only be with oneLifecycleOwnerBind, otherwise an exception is thrown
  3. When the observer’s active status changes
  4. Active -> Inactive: If LiveCycler tells OnDestroy to remove the corresponding observer, onInactive will be triggered if all observers are inactive
  5. Inactive-> active: Notifies the observer of the most recent data update (sticky message)
  6. In addition to receiving notifications of data updates when the observer’s status changes, there is also the possibility of receiving notifications of data updates when the developer is actively updating the data.

Implementation of event bus based on LiveData

As you can see, LiveData itself has observable data updates, and we can get a basic event bus by maintaining a hash table of EventName-LiveData

class LiveDataBus {
    internal val liveDatas by lazy { mutableMapOf<String, LiveData<*>>() }

    @Synchronized
    private fun <T>bus(channel: String): LiveData<T>{
        return liveDatas.getOrPut(channel){
            LiveDataEvent<T>(channel)
        } as LiveData<T>
    }

    fun <T> with(channel: String): LiveData<T>{
        return bus(channel)
    }

    companion object{
        private val INSTANCE by lazy { LiveDataBus() }
        @JvmStatic
        fun get(a) = INSTANCE
    }
}
Copy the code

But in addition to sticky events, we need the support of non-sticky events, and there are two ways to do that.

Meituan is according to overwrite observe method, reflection for ObserverWrapper. MLastVersion, at the time of subscription of initialization ObserverWrapper. MLastVersion equals LiveData. MVersion, Makes sticky messages impossible to implement (see the article in Reference 1 for details)

I’m using a different approach here, where sticky messages end up being called Observer#onChanged, so we’ll just wrap it up again and internally maintain the actual number of subscribed messages to determine if the actual onChanged method needs to be triggered

internal open class ExternalObserverWrapper<T>(val observer: Observer<in T>, val liveData: ExternalLiveData<T>): Observer<T>{
    // When creating a new observer wrapper class, the actual internal version is directly equal to the version of LiveData
    private var mLastVersion = liveData.version
    override fun onChanged(t: T) {
        if(mLastVersion >= liveData.version){
            return
        }
        mLastVersion = liveData.version
        observer.onChanged(t)
    }
}
Copy the code

We need to override the observe method to pass in our wrapped observer

internal class ExternalLiveData<T>(val key: String) : MutableLiveData<T>(){
    @MainThread
    override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
        super.observe(owner, ExternalObserverWrapper(observer, this, owner))
    }

}
Copy the code

It should be noted that after the observer set maintained by LiveData is changed into the one wrapped by us, we also need to repackage the incoming method of removing the observer, and maintain an additional hash object of the real observer and the wrapped observer. And delete the corresponding memory object when the observer is removed to prevent memory leakage. The final code is as follows

internal class ExternalLiveData<T>(val key: String) : MutableLiveData<T>(){
    internal var mObservers = mutableMapOf<Observer<in T>, ExternalObserverWrapper<T>>()

    @MainThread
    override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
        val exist = mObservers.getOrPut(observer){
            LifecycleExternalObserver(observer, this, owner).apply {
                mObservers[observer] = this
                owner.lifecycle.addObserver(this)}}super.observe(owner, exist)
    }

    @MainThread
    override fun observeForever(observer: Observer<in T>) {
        val exist = mObservers.getOrPut(observer){
            AlwaysExternalObserver(observer, this).apply { mObservers[observer] = this}}super.observeForever(exist)
    }

    @MainThread
    fun observeSticky(owner: LifecycleOwner, observer: Observer<in T>) {
        super.observe(owner, observer)
    }

    @MainThread
    fun observeStickyForever(observer: Observer<in T>){
        super.observeForever(observer)
    }

    @MainThread
    override fun removeObserver(observer: Observer<in T>) {
        valexist = mObservers.remove(observer) ? : observersuper.removeObserver(exist)
    }

    @MainThread
    override fun removeObservers(owner: LifecycleOwner) {
        mObservers.iterator().forEach { item->
            if(item.value.isAttachedTo(owner)){
                mObservers.remove(item.key)
            }
        }
        super.removeObservers(owner)
    }

    override fun onInactive(a) {
        super.onInactive()
        if(! hasObservers()){// When the corresponding liveData has no associated observer
            // Remove the LiveData being maintained
            LiveDataBus.get().liveDatas.remove(key)
        }
    }
}

internal open class ExternalObserverWrapper<T>(val observer: Observer<in T>, val liveData: ExternalLiveData<T>): Observer<T>{

    private var mLastVersion = liveData.version
    override fun onChanged(t: T) {
        if(mLastVersion >= liveData.version){
            return
        }
        mLastVersion = liveData.version
        observer.onChanged(t)
    }

    open fun isAttachedTo(owner: LifecycleOwner) = false
}

/** * Always active observer wrapper class *@param T
 * @constructor* /
internal class AlwaysExternalObserver<T>(observer: Observer<in T>, liveData: ExternalLiveData<T>):
    ExternalObserverWrapper<T>(observer, liveData)

/** * Bind the lifecycle observer wrapper class *@param T
 * @property owner LifecycleOwner
 * @constructor* /
internal class LifecycleExternalObserver<T>(observer: Observer<in T>, liveData: ExternalLiveData<T>, val owner: LifecycleOwner): ExternalObserverWrapper<T>(
    observer,
    liveData
), LifecycleObserver{
    /** * Remove the internal maintained observer */ when the bound lifecycle is destroyed
    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun onDestroy(a){
        liveData.mObservers.remove(observer)
        owner.lifecycle.removeObserver(this)}override fun isAttachedTo(owner: LifecycleOwner): Boolean {
        return owner == this.owner
    }
}
Copy the code

Constraints of events

As mentioned in the improvement article discussed in meituan later, the current EventBus (either EventBus or LiveEventBus) does not constrain the event. If student A defines the event name as “event1” string and sends the event, but student B does not subscribe to the event as “eventl” string, Then the event will never be received. In addition, when upstream deletes the sent event-related code, subscribers are not aware of it. Based on this, the definition of events is implemented by the event bus framework itself through dynamic proxies, referring to Retrofit’s approach to dynamic proxies for requests

class LiveDataBus {
  fun <E> of(clz: Class<E>): E {
        if(! clz.isInterface){throw IllegalArgumentException("API declarations must be interfaces.")}if(0 < clz.interfaces.size){
            throw IllegalArgumentException("API interfaces must not extend other interfaces.")}return Proxy.newProxyInstance(clz.classLoader, arrayOf(clz), InvocationHandler { _, method, _->
            return@InvocationHandler get().with(
                // The event name is defined by the collection class name _ event method name
                // To ensure that the event is unique
                "${clz.canonicalName}_${method.name}",
                (method.genericReturnType as ParameterizedType).actualTypeArguments[0].javaClass)
        }) as E
    }
}
Copy the code

Developers need to define an event before they can send and subscribe to it.

interface LiveEvents {
    /** * define an event *@returnLiveEventObserver<Boolean> Event type */
    fun event1(a): LiveEventObserver<Boolean>
    fun event2(a): LiveEventObserver<MutableList<String>>
}

Copy the code

Developers can then send and subscribe in the following ways

private fun sendEvent(a){
        LiveDataBus
            .get()
            .of(LiveEvents::class.java)
            .event1()
            .post(true)}private fun observe(a){
    LiveDataBus
        .get()
        .of(LiveEvents::class.java)
        .event1()
        .observe(this, Observer {
            Log.i(LOG, it.toString())
        })
}
Copy the code

reference

  1. Evolution of Android message Bus: Replace RxBus and EventBus with LiveDataBus
  2. Android componentization scheme and component message bus Modular – Event actual combat
  3. Implement an event bus with LiveData