preface
At present, EventBus and RxBus are still commonly used in the market. I once wrote source code analysis of EventBus in the early days. No matter which of these two frameworks, developers need to consider the life cycle processing. Meituan provides a solution, through LiveData to realize its own lifecycle awareness of the event bus framework. This is our own event bus framework.
The principle of LiveData
When we use LiveData as an event bus, we always need to know what it is and why we can use it to implement an event bus.
LiveData can observe data and has LifeCycle awareness, which means that LiveData will only perform observation actions when the LifeCycle is in an inActive state and its ability to do so cannot be removed from LifeCycle.
First of all, we can take a look at the UML diagram of LiveData to get a general understanding of it
LiveData
mVersion
ObserverWrapper
mLastVersion
To subscribe to
There is a mObservers inside LiveData that holds all the observers of the binding. We can subscribe via LiveData# Observe and LiveData#oberveForever. If we need to bind to the lifecycle, we need to pass in the LifecycleOwner object, register our LiveData data Observer wrapper with the lifecycle Observer to receive changes to the lifecycle and make timely updates. We can take a look at the LiveData subscription method code
@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
assertMainThread("observe");
// The current subscription request will be ignored when the currently bound component is in a DESTROYED state
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
return;
}
// Convert to an observer wrapper class with lifecycle awareness
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
// The corresponding observer can be bound to only one owner
if(existing ! =null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if(existing ! =null) {
return;
}
Registration / / lifecycle
owner.getLifecycle().addObserver(wrapper);
}
Copy the code
For the observer we need to monitor the lifecycle, LiveData wraps it as a LifecycleBoundObserver object, which inherits from ObserverWrapper, and ultimately implements the GenericLifecycleObserver interface, The lifecycle state change event is obtained by implementing the GenericLifecycleObserver#onStateChanged method.
Send a message
The difference between LiveData#setValue and LiveData#postValue is that one sends a message in the main thread, while the other sends a message in the child thread. Post eventually calls setValue through the Handler that specifies the main thread. So I’m going to focus on LiveData#setValue
@MainThread
protected void setValue(T value) {
assertMainThread("setValue");
// Send version +1
mVersion++;
mData = value;
// Information distribution
dispatchingValue(null);
}
Copy the code
When setValue is called, it is equivalent to the change of observable data maintained inside LiveData, which directly triggers event distribution
void dispatchingValue(@Nullable ObserverWrapper initiator) {
// mDispatchingValue is used to handle concurrent calls to dispatchingValue
// If new data changes occur during execution, the observer will not be notified again
// So the execution within the observer should not do time-consuming work
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if(initiator ! =null) {
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break; }}}}while (mDispatchInvalidated);
mDispatchingValue = false;
}
Copy the code
Eventually, we’ll come to the notice method, and let the observer be active and his subscriptions be considered less than the number of sendings, finally triggering the observation method we’ve implemented.
private void considerNotify(ObserverWrapper observer) {
if(! observer.mActive) {return;
}
if(! observer.shouldBeActive()) { observer.activeStateChanged(false);
return;
}
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//noinspection unchecked
observer.mObserver.onChanged((T) mData);
}
Copy the code
Note that LiveData#dispatchingValue is not only triggered when we actively update data, but also notified when our observer status changes (inactive->active), thus LiveData must support sticky events
class LifecycleBoundObserver extends ObserverWrapper implements GenericLifecycleObserver {
@Override
public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) {
if (mOwner.getLifecycle().getCurrentState() == DESTROYED) {
removeObserver(mObserver);
return; } activeStateChanged(shouldBeActive()); }}private abstract class ObserverWrapper {
void activeStateChanged(boolean newActive) {
if (newActive == mActive) {
return;
}
// When the observer status changes from active->inactive, or inactive-> Active, follow the following process
mActive = newActive;
boolean wasInactive = LiveData.this.mActiveCount == 0;
LiveData.this.mActiveCount += mActive ? 1 : -1;
if (wasInactive && mActive) {
onActive();
}
if (LiveData.this.mActiveCount == 0 && !mActive) {
// The onInactive null method is triggered when the current liveData observer is inactive and the current observer is active->inactive
// We can overwrite onInactive to determine if all liveData observers fail, such as freeing some large memory objects
onInactive();
}
// When the observer is from inactive->active
// The observer needs to be notified
if (mActive) {
dispatchingValue(this); }}}Copy the code
The principle of summary
In summary, we can know as follows about LiveData:
LiveData
The observer may or may not be associated with the lifecycleLiveData
The observer can only be with oneLifecycleOwner
Bind, otherwise an exception is thrown- When the observer’s active status changes
- Active -> Inactive: If LiveCycler tells OnDestroy to remove the corresponding observer, onInactive will be triggered if all observers are inactive
- Inactive-> active: Notifies the observer of the most recent data update (sticky message)
- In addition to receiving notifications of data updates when the observer’s status changes, there is also the possibility of receiving notifications of data updates when the developer is actively updating the data.
Implementation of event bus based on LiveData
As you can see, LiveData itself has observable data updates, and we can get a basic event bus by maintaining a hash table of EventName-LiveData
class LiveDataBus {
internal val liveDatas by lazy { mutableMapOf<String, LiveData<*>>() }
@Synchronized
private fun <T>bus(channel: String): LiveData<T>{
return liveDatas.getOrPut(channel){
LiveDataEvent<T>(channel)
} as LiveData<T>
}
fun <T> with(channel: String): LiveData<T>{
return bus(channel)
}
companion object{
private val INSTANCE by lazy { LiveDataBus() }
@JvmStatic
fun get(a) = INSTANCE
}
}
Copy the code
But in addition to sticky events, we need the support of non-sticky events, and there are two ways to do that.
Meituan is according to overwrite observe method, reflection for ObserverWrapper. MLastVersion, at the time of subscription of initialization ObserverWrapper. MLastVersion equals LiveData. MVersion, Makes sticky messages impossible to implement (see the article in Reference 1 for details)
I’m using a different approach here, where sticky messages end up being called Observer#onChanged, so we’ll just wrap it up again and internally maintain the actual number of subscribed messages to determine if the actual onChanged method needs to be triggered
internal open class ExternalObserverWrapper<T>(val observer: Observer<in T>, val liveData: ExternalLiveData<T>): Observer<T>{
// When creating a new observer wrapper class, the actual internal version is directly equal to the version of LiveData
private var mLastVersion = liveData.version
override fun onChanged(t: T) {
if(mLastVersion >= liveData.version){
return
}
mLastVersion = liveData.version
observer.onChanged(t)
}
}
Copy the code
We need to override the observe method to pass in our wrapped observer
internal class ExternalLiveData<T>(val key: String) : MutableLiveData<T>(){
@MainThread
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
super.observe(owner, ExternalObserverWrapper(observer, this, owner))
}
}
Copy the code
It should be noted that after the observer set maintained by LiveData is changed into the one wrapped by us, we also need to repackage the incoming method of removing the observer, and maintain an additional hash object of the real observer and the wrapped observer. And delete the corresponding memory object when the observer is removed to prevent memory leakage. The final code is as follows
internal class ExternalLiveData<T>(val key: String) : MutableLiveData<T>(){
internal var mObservers = mutableMapOf<Observer<in T>, ExternalObserverWrapper<T>>()
@MainThread
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
val exist = mObservers.getOrPut(observer){
LifecycleExternalObserver(observer, this, owner).apply {
mObservers[observer] = this
owner.lifecycle.addObserver(this)}}super.observe(owner, exist)
}
@MainThread
override fun observeForever(observer: Observer<in T>) {
val exist = mObservers.getOrPut(observer){
AlwaysExternalObserver(observer, this).apply { mObservers[observer] = this}}super.observeForever(exist)
}
@MainThread
fun observeSticky(owner: LifecycleOwner, observer: Observer<in T>) {
super.observe(owner, observer)
}
@MainThread
fun observeStickyForever(observer: Observer<in T>){
super.observeForever(observer)
}
@MainThread
override fun removeObserver(observer: Observer<in T>) {
valexist = mObservers.remove(observer) ? : observersuper.removeObserver(exist)
}
@MainThread
override fun removeObservers(owner: LifecycleOwner) {
mObservers.iterator().forEach { item->
if(item.value.isAttachedTo(owner)){
mObservers.remove(item.key)
}
}
super.removeObservers(owner)
}
override fun onInactive(a) {
super.onInactive()
if(! hasObservers()){// When the corresponding liveData has no associated observer
// Remove the LiveData being maintained
LiveDataBus.get().liveDatas.remove(key)
}
}
}
internal open class ExternalObserverWrapper<T>(val observer: Observer<in T>, val liveData: ExternalLiveData<T>): Observer<T>{
private var mLastVersion = liveData.version
override fun onChanged(t: T) {
if(mLastVersion >= liveData.version){
return
}
mLastVersion = liveData.version
observer.onChanged(t)
}
open fun isAttachedTo(owner: LifecycleOwner) = false
}
/** * Always active observer wrapper class *@param T
* @constructor* /
internal class AlwaysExternalObserver<T>(observer: Observer<in T>, liveData: ExternalLiveData<T>):
ExternalObserverWrapper<T>(observer, liveData)
/** * Bind the lifecycle observer wrapper class *@param T
* @property owner LifecycleOwner
* @constructor* /
internal class LifecycleExternalObserver<T>(observer: Observer<in T>, liveData: ExternalLiveData<T>, val owner: LifecycleOwner): ExternalObserverWrapper<T>(
observer,
liveData
), LifecycleObserver{
/** * Remove the internal maintained observer */ when the bound lifecycle is destroyed
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy(a){
liveData.mObservers.remove(observer)
owner.lifecycle.removeObserver(this)}override fun isAttachedTo(owner: LifecycleOwner): Boolean {
return owner == this.owner
}
}
Copy the code
Constraints of events
As mentioned in the improvement article discussed in meituan later, the current EventBus (either EventBus or LiveEventBus) does not constrain the event. If student A defines the event name as “event1” string and sends the event, but student B does not subscribe to the event as “eventl” string, Then the event will never be received. In addition, when upstream deletes the sent event-related code, subscribers are not aware of it. Based on this, the definition of events is implemented by the event bus framework itself through dynamic proxies, referring to Retrofit’s approach to dynamic proxies for requests
class LiveDataBus {
fun <E> of(clz: Class<E>): E {
if(! clz.isInterface){throw IllegalArgumentException("API declarations must be interfaces.")}if(0 < clz.interfaces.size){
throw IllegalArgumentException("API interfaces must not extend other interfaces.")}return Proxy.newProxyInstance(clz.classLoader, arrayOf(clz), InvocationHandler { _, method, _->
return@InvocationHandler get().with(
// The event name is defined by the collection class name _ event method name
// To ensure that the event is unique
"${clz.canonicalName}_${method.name}",
(method.genericReturnType as ParameterizedType).actualTypeArguments[0].javaClass)
}) as E
}
}
Copy the code
Developers need to define an event before they can send and subscribe to it.
interface LiveEvents {
/** * define an event *@returnLiveEventObserver<Boolean> Event type */
fun event1(a): LiveEventObserver<Boolean>
fun event2(a): LiveEventObserver<MutableList<String>>
}
Copy the code
Developers can then send and subscribe in the following ways
private fun sendEvent(a){
LiveDataBus
.get()
.of(LiveEvents::class.java)
.event1()
.post(true)}private fun observe(a){
LiveDataBus
.get()
.of(LiveEvents::class.java)
.event1()
.observe(this, Observer {
Log.i(LOG, it.toString())
})
}
Copy the code
reference
- Evolution of Android message Bus: Replace RxBus and EventBus with LiveDataBus
- Android componentization scheme and component message bus Modular – Event actual combat
- Implement an event bus with LiveData