The previous article in the Android Jetpack series LiveData introduced the basic usage of LiveData. This article introduces a more advanced use of LiveData – implementing a message bus based on LiveData

The message bus

Passing data across pages (especially across multiple pages) is a common operation in Android development. It can be passed through handlers, interface callbacks, and so on, but none of these are very elegant, and the message bus is a more elegant way to pass data.

The biggest advantage of message bus is decoupling, avoiding strong coupling between classes. Generally, message bus can be implemented in the following ways:

  • EventBus:github.com/greenrobot/…
  • RxBus: Message bus based on RxJava implementation
  • LiveDataBus: based on theJetpackIn theLiveDataImplementation, is also the main implementation of this article.

EventBus

The overall idea of EventBus is as follows:

EventBus is based on the publish/subscribe model. Publishers and subscribers have a one-to-many relationship. There is only one publisher but multiple subscribers. The publisher transmits the data to the scheduling center, and the scheduling center will find the subscribers corresponding to the publisher and transfer the data to the subscribers in turn, thus completing the data transmission. If there are no subscribers, no data will be passed. Publishers and subscribers do not need to be aware of each other’s existence, that is, the data transfer process is decoupled.

RxBus

RxBus itself relies on the power of RxJava. RxJava has a Subject, which is a special existence. It is both an Observable and an Observer, and can be regarded as a bridge or agent. There are four types of Subject:

  • AsyncSubject: The Observer will only receive the last data sent by the AsyncSubject before onComplete(), which must be called, regardless of when the subscription occurs.
  • A BehaviorSubject: The Observer receives the last event of the BehaviorSubject before it was subscribed, and then all events sent after the subscription.
  • PublishSubject: The Observer only accepts events that are sent after the PublishSubject has been subscribed.
  • ReplaySubject: The Observer receives all events sent by the ReplaySubject, regardless of when the subscribe is started.

For details, see the use of Subject and Processor in RxJava

It is possible to implement a message bus through Subject, and since it is not the focus of this article, I will not post the code and can search for its implementation.

LiveDataBus

LiveDataBus is implemented based on LiveData, and its usage and benefits were described in detail in the previous article:

  • Ensure that the interface conforms to data state LiveData follows observer mode. LiveData notifies the Observer when data changes, and UI updates can be made in methods called back by the Observer, which is data-driven.

  • An observer that does not leak will bind to Lifecycle objects and clean up after its associated Lifecycle is destroyed (for example, when an Activity enters the ONDESTROY state).

  • If the observer’s life cycle is inactive (such as returning an Activity in the stack), it will not receive any LiveData events.

  • You no longer need to manually handle life cycle interface components and simply observe the relevant data without stopping or resuming the observation. LiveData automatically manages all of these operations because it can sense the associated lifecycle state changes as it observes.

  • Data is always up to date. If the life cycle becomes inactive, it receives the latest data when it becomes active again. For example, an Activity that was once in the background receives the latest data as soon as it returns to the foreground.

  • Automatically saves data when configuration changes If an Activity or Fragment is recreated due to a configuration change (such as a device rotation), it immediately receives the latest available data.

  • Shared resources use a singleton pattern to extend LiveData objects to encapsulate system services so that they can be shared across applications. The LiveData object is connected to the system service once, and then any observer that needs the corresponding resource only needs to observe the LiveData object.

The principle of

  • Messages: sent by the publisher and received by the subscriber. Messages can be of either a basic type or a custom type.
  • News channel:LiveDataPlays the role of message channel. Different message channels are distinguished by different names. The name isStringType, which can be obtained by nameLiveDataMessage channels.
  • The message busThe message bus is implemented as a singleton, with different message channels stored in one HashMapIn the.
  • To subscribe to: Subscriber passes get()Get the message channel and callobserve()Subscribe to messages for this channel.
  • release: The publisher has passedget()Get the message channel and callsetValue()Release the news.

LiveData implements the message bus

LiveData implements the benefits of a message bus

Compared with EventBus and RxBus, using LiveData to implement message bus has the following advantages:

  • EventBus, RxBus, LiveDataBusBoth events need to be registered and unregistered. Different from theEventBus, RxBusManually unregister,LiveDataThe life cycle can be managed automatically, so automatic deregistration can also be implemented to avoid memory leaks caused by forgetting to deregistration.
  • LiveDataThe implementation is simple, which isJetpackAn important member of the official launch, and better support
  • LiveDataCompared to theEventBus, RxBus, fewer classes and smaller packages.

Pitfalls of LiveData implementation message bus

LiveData is sticky messages by default

In Android Jetpack series LiveData, we also see that LiveData sends sticky messages, that is, publish and subscribe to LiveData, and then post the logic of subscribing to LiveData:

@MainThread public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) { assertMainThread("observe"); If (owner.getLifecycle().getCurrentState() == DESTROYED) {// If the observer is in a DESTROYED state, return; } // LifecycleBoundObserver LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner,  observer); //ObserverWrapper is the parent of LifecycleBoundObserver ObserverWrapper existing = mobServers.putifAbsent (observer, wrapper); // If the Observer exists in the mObservers and is different from the LifecycleOwner passed in, throw an exception. An Observer can only correspond to one LifecycleOwner if (existing! = null && ! existing.isAttachedTo(owner)) { throw new IllegalArgumentException("Cannot add the same observer" + " with different lifecycles"); } // If an Observer already exists and is the same as the LifecycleOwner passed in, return if (existing! = null) { return; } // Add an observer with Lifecycle owner.getlifecycle ().addobserver (wrapper); }Copy the code

Finally, after executing addObserver(), add an Observer internally via LifecycleRegistry, which executes onStateChanged(), This method calls the dispatchingValue method (setValue/postValue will eventually be called as well), and then our most concerned considerNotify() :

void dispatchingValue(@Nullable ObserverWrapper initiator) { if (mDispatchingValue) { mDispatchInvalidated = true; return; } mDispatchingValue = true; do { mDispatchInvalidated = false; if (initiator ! Let's notice (function) {workview (); initiator = null; Iterator< map.entry <Observer<? Iterator< map.entry <Observer<? super T>, ObserverWrapper>> iterator = mObservers.iteratorWithAdditions(); iterator.hasNext(); ) { considerNotify(iterator.next().getValue()); if (mDispatchInvalidated) { break; } } } } while (mDispatchInvalidated); mDispatchingValue = false; } private void considerNotify(ObserverWrapper observer) { if (! Observer.mactive) {// Return when the observer is not active; } // If you observe(), you are in the RESUMED state; If ObserveForever() is always considered active if (! observer.shouldBeActive()) { observer.activeStateChanged(false); return; If (observer.mlastVersion >= mVersion) {return; } observer.mLastVersion = mVersion; / / callback Observer onChange method and receive data. The Observer mObserver. OnChanged (mData (T)); }Copy the code

Notice () has the following logic:

if (observer.mLastVersion >= mVersion) {
   return;
   }
Copy the code

MVersion represents the version number. Both sender and subscriber have this variable. The default is -1. MVersion does +1 each time the sender sends a message; Each time an Observer mVersion receives a message successfully, it assigns the latest version of the sender to its own mLastVersion. If an Observer mLastVersion>= mVersion of the sender, the Observer rejects the message. Prevent messages from being sent repeatedly.

Therefore, if the previous mVersion of the sender is not the default value -1, then LiveData sent the message. If livedata.observe () is executed, the mLastVersion in the Observer is less than the mVersion of the sender (the default value is -1). Therefore, the message will not be intercepted and the Observer will be able to retrieve the previously sent message, namely, the sticky message.

Livedata. postValue may lose messages

If multiple messages are frequently sent using livedata.postValue, livedata.observe () may lose messages when receiving them. Why? Look at the internal implementation of postValue()

// livedata.java //postValue To send data, you can use protected void postValue(T value) {Boolean postTask; Synchronized (mDataLock) {// the default value of mPendingData is NOT_SET. PostTask is true for the first time. PostTask = mPendingData == NOT_SET; // Assign the sent value to mPendingData. } // postTask is true when the first message is sent, false when the first message is sent, so all subsequent messages will be intercepted, but the sent value can be updated to the value of the first message if (! postTask) { return; } ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable); } //setValue sends data. Use protected void setValue(T value) {assertMainThread("setValue"); mVersion++; mData = value; dispatchingValue(null); } private final Runnable mPostValueRunnable = new Runnable() { @SuppressWarnings("unchecked") @Override public void run() { Object newValue; Synchronized (mDataLock) {// Pass the setValue from mPendingData to the Observer and format itself as NOT_SET newValue = mPendingData; mPendingData = NOT_SET; } setValue((T) newValue); }};Copy the code

The main reason for this is that when postValue sends a message, it determines whether the previous message has been processed, and if not, it updates the current value to the previous message (the previous message is stored in mPendingData, so it updates it directly). So when postValue is frequently used to send messages many times, the Observer receives the latest value from the last time it was sent. I guess the official purpose of this implementation is mainly to use LiveData in MVVM architecture, which is mainly to update the latest data of UI, but when using LiveData to achieve the message bus, there may be a hidden danger of losing messages, which is what we do not want to see, so how to solve it? Instead of using postValue, use setValue to send messages. If you want to send messages in a child thread, build your own Handler and send it to the main thread.

The solution

Support for sticky and non-sticky messages

Since LiveData is a sticky message by default, we just need to add non-sticky message support. The mVersion of LiveData is private by default. If you want to use it in other classes, you can get it through reflection, but it is relatively inefficient. Lifecycle package name can also be used to avoid reflection to get livedata.mversion, as follows:

//package androidx.lifecycle open class ExternalLiveData<T> : MutableLiveData<T>() {companion object {androidx.lifecycle package name to avoid reflection get livedata.start_version const val START_VERSION = LiveData.START_VERSION } override fun observe(owner: LifecycleOwner, observer: Observer<in T>) { if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) { // ignore return } try { val wrapper = ExternalLifecycleBoundObserver(owner, observer) val existing = callMethodPutIfAbsent(observer, wrapper) as? LiveData<*>.LifecycleBoundObserver require(! (existing ! = null && ! existing.isAttachedTo(owner))) { ("Cannot add the same observer" + " with different lifecycles") } if (existing ! = null) return owner.lifecycle.addObserver(wrapper) } catch (e: Override fun getVersion() {override fun getVersion(); Int { return super.getVersion() } internal inner class ExternalLifecycleBoundObserver( owner: LifecycleOwner, observer: Observer<in T>? ) : LifecycleBoundObserver(owner, observer) { override fun shouldBeActive(): Boolean { return mOwner.lifecycle.currentState.isAtLeast(observerActiveLevel()) } } /** * @return Lifecycle.State */ protected open fun observerActiveLevel(): Lifecycle.State {return Lifecycle.State.STARTED} // Any get() { val fieldObservers = LiveData::class.java.getDeclaredField("mObservers") fieldObservers.isAccessible = true Return fieldObservers} /** * Reflection calls LiveData putIfAbsent */ private fun callMethodPutIfAbsent(observer: Any, wrapper: Any): Any? { val mObservers = fieldObservers.javaClass val putIfAbsent = mObservers.getDeclaredMethod("putIfAbsent", Any::class.java, Any::class.java) putIfAbsent.isAccessible = true return putIfAbsent.invoke(mObservers, observer, wrapper) } }Copy the code

This allows the external use of mVersion. The overall idea is to control the Observer through decorator mode, as in:

/** * Observer decorator */ class ObserverWrapper<T>(private val Observer: Observer<T>, var preventNextEvent: Boolean = false ) : Observer<T> { override fun onChanged(t: T) { if (preventNextEvent) { preventNextEvent = false return } observer.onChanged(t) } }Copy the code

Non-sticky messages:

val observerWrapper = ObserverWrapper(observer)
observerWrapper.preventNextEvent = liveData.version > ExternalLiveData.START_VERSION
liveData.observe(owner, observerWrapper)
Copy the code

Livedata. version > externallivedata. START_VERSION Indicates that a message has been sent to liveData, and the version value is not the initial value. If it is a late-registered observer, ObserverWrapper. PreventNextEvent return is true, that will block the current news, the observer does not perform; If the observer is registered first, it will not be affected, thus achieving non-sticky messages.

Sticky news:

val observerWrapper = ObserverWrapper(observer)
liveData.observe(owner, observerWrapper)
Copy the code

Nothing to say, the default is sticky, no special handling required.

Supports child threads to send messages

Check whether it is in the main thread:

Object ThreadUtils {/** * if the thread is in the main thread */ fun isMainThread(): Boolean { return Looper.myLooper() == Looper.getMainLooper() } }Copy the code

Determine the current thread when sending messages:

private val mainHandler = Handler(Looper.getMainLooper())

override fun post(value: T) {
    if (ThreadUtils.isMainThread()) {
        postInternal(value)
    } else {
        mainHandler.post(PostValueTask(value))
    }
}

@MainThread
private fun postInternal(value: T) {
    liveData.value = value
}

inner class PostValueTask(val newValue: T) : Runnable {
    override fun run() {
        postInternal(newValue)
    }
}
Copy the code

When a post message is sent, the message is sent directly to the main thread, and the message is sent to the main thread through the MainHandler for the child thread, which supports sending messages in the child thread.

reference

【 1 】 tech.meituan.com/2018/07/26/… [2] The code is mainly from github.com/JeremyLiao/…