The shortage of the LiveData

  • LiveData is an observable data storage class with autonomous lifecycle awareness dedicated to Android. The design is intentionally simplified to make it easy for developers to get started, but it has two drawbacks:
  1. LiveData can only update data in the main thread (postValue is also switched to the main thread, and there may be data loss);
  2. The LiveData operator is not powerful enough. RxJava or Flow is recommended for complex interactive data Flow scenarios.
  3. LiveData is closely tied to the Android platform, and while it works well in the presentation layer, it doesn’t work well in the domain layer, which is ideally platform-independent;
  • LiveData is still a viable solution for Java developers, beginners, and simple scenarios. For MVVM architecture, ViewModel and ViewModel can interact with each other via LiveData (StateFlow can also be used), and ViewModel and Repository can interact with each other via Flow.

The shortage of the RxJava

  • RxJava is still quite powerful, chain calls based on event streams, time-consuming tasks, thread switches, and is a good library for asynchronous operations, but it has some drawbacks for Android development
  1. Powerful means complex, and its myriad operators are a beginner’s nightmare;
  2. It’s unofficial, so Google doesn’t make much effort to promote and optimize it;
  3. An additional increase in the package size of the project;

Flow

  • A Flow is a “Cold Stream.” Cold flow “of this kind of data source is a kind of data source, producers will be at the time of each event listeners to start consumption execution (i.e. not consumption is not the production data, and the sender does not depend on the receiving end of LiveData), so as to create a new data flow on each subscription (when there are multiple subscribers, is independent of their respective events). Once the consumer stops listening or the producer stops blocking, the data flow is automatically shut down.
  • Flow is the product of a combination of Kotlin coroutines and responsive programming models that support thread switching, backpressure, and automatic cleanup through coroutine cancellation, so it tends to perform heavy tasks.
  • Use operators such as take, first, toList to simplify code testing for Flow.
  • Flow itself does not understand the Android life cycle and does not provide automatic pause and recovery for the collector when the Android life cycle state changes. Use the LifecycleCoroutineScope extension. Such as launchWhenStarted to start coroutine to collect our Flow– these collectors will automatically pause and resume synchronically with the component’s Lifecycle.
  • The Flow end operator, in contrast to a Channel, triggers the execution of a data Flow and determines whether the operation succeeds or throws an exception based on the producer-side Flow operation. Therefore Flows automatically shut down the data Flow without leaking resources on the producer side. Once a Channel is not closed properly, producers may not clean up large resources, so Channels are more likely to cause resource leakage.

Some common operators for Flow

// val flow = flowOf(1,2,3,4,5) Flow < Int > = Flow {/ / the List (20) {/ / emit (it) to send data / / / / delay (300) / /} / /} val Flow = (1.. AsFlow () lifecyclescopes.launch {flow.flowon (dispatchers.io)// Set the scheduler it uses when it runs. FlowOn (dispatchers.main).oneach {log("onEach:$it") delay(300)}.filter {// filter it % 2 == 0} {/ / log (" map: $* it $") it * it}. The transform < Int, String > {" num = $it "/ / emit (" num1 = $it") / / emit (" num2 = $it ")} .flowon (dispatchers.io). OnCompletion {// Subscribe to the completion of the flow and execute the logical log upon completion of the flow ("onCompletion: $it")}. Catch {// Catch the exception of the Flow, the catch function can only catch its upstream exception log("catch: $it")}.flowon (dispatchers.main).collect {// consume Flow log("collect1_1: Collect {log("collect1_2: $it")} // So that the specific operation of consumption does not need to be put together with the end operator, OnEach {log("onEach2:$it")} withContext(dispatchers.io) {delay(1000) flow.collect()} // In addition to using subcoroutines, OnEach {log("onEach2:$it")}.launchin (CoroutineScope(dispatchers.io)) .join()// The main thread waits for the coroutine to finish executingCopy the code

The cancellation of the Flow

lifecycleScope.launch(Dispatchers.IO) { val flow2 = (1.. 10).asFlow().onEach { delay(1000) } val job: Job = lifecycleScope. Launch {log("lifecycleScope. Launch ") flow2.flowOn(Dispatchers Delay (2000) job.cancelandJoin ()}} delay(2000) job.cancelandjoin ()}Copy the code

Flow back pressure

  • As long as you’re doing responsive programming, you’re going to have back pressure, so let’s look at what back pressure is.
  • Back pressure problems arise when the producer’s production rate is higher than the consumer’s processing rate. To ensure that data is not lost, we will also consider adding caches to alleviate the problem:
Flow {List(5) {emit(it)}.buffer().collect {log(" Flow buffer collect:$it")}Copy the code
  • You can also specify a size for buffer. However, if we simply add caching instead of solving the problem at its root, we will always end up with a backlog of data.
  • The root cause of the problem is the mismatch between production and consumption rates. In addition to directly optimizing the performance of consumers, we can also take some trade-offs.
  • The first is conflate. In line with a Channel’s Conflate mode, new data overwrites old data,
flow {
    List(10) {
        emit(it)
    }
}
.conflate()
.collect { value ->
    log("flow conflate Collecting $value")
    delay(100)
    log("$value collected flow conflate ")
}
Copy the code
  • The second is collectLatest. As the name suggests, only the most recent data is processed, which may seem the same as Conflate, but in fact is very different: It doesn’t just overwrite old data with new data, but each one gets processed, but if the previous one doesn’t get processed before the next one comes in, the logic to process the previous one gets removed and in addition to collectLatest there’s mapLatest, flatMapLatest, and so on, that’s what it does.
flow {
    List(10) {
        emit(it)
    }
}.collectLatest { value ->
    log("flow collectLatest Collecting $value")
    delay(100)
    log("$value collected flow collectLatest ")
}
Copy the code

Collect Android UI data streams in a more secure way

  • In Android development, Please use the LifecycleOwner addRepeatingJob, suspend Lifecycle. RepeatOnLifecycle or Flow. FlowWithLifecycle safely collected data stream from the UI layer. Lifecycle – Runtime-ktx :2.4.+
lifecycleScope.launch {
    delay(500)
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        flow.collect { log("collect2: $it") }
    }
}
lifecycleScope.launchWhenStarted {
    delay(1000)
    flow.collect { log("collect3: $it") }
}
lifecycleScope.launch {
    delay(1500)
    flow.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
        .collect { log("collect4: $it") }
}
Copy the code

SharedFlow

  • The relationship between cold flow and subscribers can only be one-to-one. When we want to fulfill the needs of one stream, multiple subscribers, we need hot flow, and SharedFlow is a hot flow
  • Its constructors are as follows
public fun <T> MutableSharedFlow( replay: Int = 0,// When a new subscriber collects, it sends several data that has already been sent to it. Default is 0, that is, the new subscriber does not get the previous data extraBufferCapacity by default: Int = 0,// How much data is cached after subtracting replay and MutableSharedFlow. Default is 0 onBufferOverflow: BufferOverflow = bufferoverflow. SUSPEND // bufferoverflow. DROP_LATEST: discard the latest data)Copy the code
  • Simple use is as follows
val sharedFlow = MutableSharedFlow<String>()
lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    sharedFlow.emit("aaa")
    delay(1000)
    sharedFlow.emit("bbb")
    delay(1000)
    sharedFlow.emit("ccc")
}
lifecycleScope.launch {
    delay(500)
    sharedFlow.collect { log("collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    sharedFlow.collect { log("collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    sharedFlow.collect { log("collect3:$it") }
}
lifecycleScope.launch {
    delay(3500)
    sharedFlow.collect { log("collect4:$it") }
}
Copy the code
  • Convert cold Flow to SharedFlow
lifecycleScope.launch { (1.. Scope = lifecycleScope, //2. // Started = SharingStarted.Lazily,// When the first subscriber appears, Is terminated by the end of the scope of the specified in the scope / / started = SharingStarted Eagerly, / / begin immediately, and the scope specified in the scope to be at the end of the end / / for those operations executed only once, you can use Lazily or Eagerly. However, if you need to look at other flows, Should use WhileSubscribed to optimizing the subtle but important work / / WhileSubscribed strategy will be in the absence of the collector to cancel the upstream data flow started = SharingStarted. WhileSubscribed ( 500,//stopTimeoutMillis controls a delay value in milliseconds, which is the time between the last subscriber ending the subscription and stopping the upstream stream. The default value is 0 (for example, when a user spinning equipment, the original view will be destroyed first, and then a few seconds to rebuild) Long. MAX_VALUE / / replayExpirationMillis said data replay date time, if the user leave application for too Long, right now you don't want to let the user see stale data, You can use this parameter), //3. Collect {log(" sharein.collect :$it")}}Copy the code

StateFlow

  • StateFlow inherits from SharedFlow and is a special variant of SharedFlow
  • The constructor is as follows, passing in only a default value
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ? : NULL)Copy the code
  • StateFlow is essentially a SharedFlow with a replay of 1 and no buffers, so the first subscription gets the default value first
  • StateFlow returns only if the value has been updated and the value has changed, that is, it does not call back to the Collect method if the updated value has not changed, unlike LiveData
  • StateFlow is closest to LiveData because:
1. It's always worth something. 2. Its value is unique. 3. It is allowed to be shared (and therefore a shared data stream) by multiple observers. 4. It will always just reproduce the latest value to the subscriber, regardless of the number of active observers.Copy the code
  • Simple to use
Log ("StateFlow default :111") val StateFlow = MutableStateFlow("111") lifecyclescope.launch {delay(500) stateflow.collect {delay(500) stateflow.collect { log("StateFlow collect1:$it") } } lifecycleScope.launch { delay(1500) stateFlow.collect { log("StateFlow collect2:$it") } } lifecycleScope.launch { delay(2500) stateFlow.collect { log("StateFlow collect3:$it") } } lifecycleScope.launch(Dispatchers.IO) { delay(5000) log("StateFlow re emit:111") stateFlow.emit("111") delay(1000) log("StateFlow emit:222") stateFlow.emit("222") }Copy the code
  • Normal Flow is converted to StateFlow
val stateFlow2: StateFlow<Int> = flow { List(10) { delay(300) emit(it) } }.stateIn( scope = lifecycleScope, Started = WhileSubscribed (5000), there is still no subscriber / / wait for 5 seconds is terminated coroutines initialValue = 666. / / the default value) lifecycleScope launchWhenStarted Stateflow2. collect {log("StateFlow sharein.collect :$it")}}Copy the code

Differences between StateFlow and SharedFlow

  1. StateFlow is a special version of SharedFlow. Replay is fixed to 1 and buffer size is 0 by default.
  2. StateFlow is similar to LiveData in that it can obtain the current state from myflow. value. If this is required, StateFlow must be used.
  3. SharedFlow supports issuing and collecting duplicate values, while StateFlow does not call collect when a value is repeated.
  4. For new subscribers, StateFlow will replay only the current and latest values, and SharedFlow can configure the number of replay elements (default is 0, that is, no replay).

Encapsulate FlowBus based on SharedFlow

Create the message class EventMessage

Class EventMessage {/** * the key of the message */ var key: Int /** * the body of the message */ var message: Any? = null private var messageMap: HashMap<String, Any? >? = null constructor(key: Int, message: Any?) { this.key = key this.message = message } constructor(key: Int) { this.key = key } fun put(key: String, message: Any?) { if (messageMap == null) { messageMap = HashMap<String, Any? >() } messageMap? .set(key, message) } operator fun <T> get(key: String?) : T? { if (messageMap ! = null) { try { return messageMap!! [key] as T? } catch (e: ClassCastException) { e.printStackTrace() } } return null } }Copy the code

Create FlowBus

class FlowBus : The ViewModel () {companion object {val instance by lazy (mode = LazyThreadSafetyMode. SYNCHRONIZED) {FlowBus ()}} / / normal event Private val stickyEvents = mutableMapOf<String, Event<*>>() Event<*>>() fun with(key: String, isSticky: Boolean = false): Event<Any> { return with(key, Any::class.java, isSticky) } fun <T> with(eventType: Class<T>, isSticky: Boolean = false): Event<T> { return with(eventType.name, eventType, isSticky) } @Synchronized fun <T> with(key: String, type: Class<T>?, isSticky: Boolean): Event<T> { val flows = if (isSticky) stickyEvents else events if (! flows.containsKey(key)) { flows[key] = Event<T>(key, isSticky) } return flows[key] as Event<T> } class Event<T>(private val key: String, isSticky: Boolean) { // private mutable shared flow private val _events = MutableSharedFlow<T>( replay = if (isSticky) 1 else 0, extraBufferCapacity = Int.MAX_VALUE ) // publicly exposed as read-only shared flow val events = _events.asSharedFlow() /** * need main thread execute */ fun observeEvent( lifecycleOwner: LifecycleOwner, dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate, minActiveState: Lifecycle.State = Lifecycle.State.STARTED, action: (t: T) -> Unit ) { lifecycleOwner.lifecycle.addObserver(object : DefaultLifecycleObserver { override fun onDestroy(owner: LifecycleOwner) { super.onDestroy(owner) LjyLogUtil.d("EventBus.onDestroy:remove key=$key") val subscriptCount = _events.subscriptionCount.value if (subscriptCount <= 0) instance.events.remove(key) } }) lifecycleOwner.lifecycleScope.launch(dispatcher) { lifecycleOwner.lifecycle.whenStateAtLeast(minActiveState) { events.collect { try { action(it) } catch (e: Exception) { LjyLogUtil.d("ker=$key , error=${e.message}") } } } } } /** * send value */ suspend fun setValue( event: T, dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate ) { withContext(dispatcher) { _events.emit(event) } } } }Copy the code

Using FlowBus

FlowBus.instance.with(EventMessage::class.java).observeEvent(this) { LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}") } Lifecyclescopes.launch (dispatchers.io) {withContext(dispatchers.main) {// Instead of creating a new coroutine, Flowbus.instance. with(EventMessage::class.java).observeEvent(this@EventBusActivity) { LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}") } } } FlowBus.instance.with(EventMessage::class.java).observeEvent(this) { LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}") } lifecycleScope.launch(Dispatchers.Main) { val event = EventMessage(111) LjyLogUtil.d( "FlowBus:send1_${Thread.currentThread().name}_${ GsonUtils.toJson( event ) }" ) FlowBus.instance.with(EventMessage::class.java).setValue(event) delay(2000) FlowBus.instance.with(EventMessage::class.java) .setValue(EventMessage(101)) FlowBus.instance.with(EventMessage::class.java) .setValue(EventMessage(102)) FlowBus.instance.with(EventMessage::class.java) .setValue(EventMessage(103)) FlowBus.instance.with(EventMessage::class.java) .setValue(EventMessage(104)) FlowBus.instance.with(EventMessage::class.java) .setValue(EventMessage(105)) } lifecycleScope.launch(Dispatchers.IO) { delay(4000) val event = EventMessage(222, "bbb") LjyLogUtil.d( "FlowBus:send2_${Thread.currentThread().name}_${ GsonUtils.toJson( event ) }" ) FlowBus.instance.with(EventMessage::class.java).setValue(event) } lifecycleScope.launch(Dispatchers.Default) { delay(6000) withContext(Dispatchers.Main) { val event = EventMessage(333, "ccc") event.put("key1", 123) event.put("key2", "abc") LjyLogUtil.d( "FlowBus:send3_${Thread.currentThread().name}_${ GsonUtils.toJson( event ) }" ) FlowBus.instance.with(EventMessage::class.java).setValue(event) } }Copy the code

Further optimization

  • Using extension function, ViewModelStoreOwner, and early preach EventMessage: : class. Javas is currently used in the project is more simple
/ / by using extension function fun LifecycleOwner. ObserveEvent (dispatcher: CoroutineDispatcher = Dispatchers. Main. Immediate, minActiveState: Lifecycle.State = Lifecycle.State.STARTED, isSticky: Boolean = false, action: (t: EventMessage) -> Unit ) { ApplicationScopeViewModelProvider .getApplicationScopeViewModel(FlowBus::class.java) .with(EventMessage::class.java, isSticky = isSticky) .observeEvent(this@observeEvent, dispatcher, minActiveState, action) } fun postValue( event: EventMessage, delayTimeMillis: Long = 0, isSticky: Boolean = false, dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate, ) { LjyLogUtil.d("FlowBus:send_${Thread.currentThread().name}_${GsonUtils.toJson(event)}") ApplicationScopeViewModelProvider .getApplicationScopeViewModel(FlowBus::class.java) .viewModelScope .launch(dispatcher)  { delay(delayTimeMillis) ApplicationScopeViewModelProvider .getApplicationScopeViewModel(FlowBus::class.java) .with(EventMessage::class.java, isSticky = isSticky) .setValue(event) } } private object ApplicationScopeViewModelProvider : ViewModelStoreOwner { private val eventViewModelStore: ViewModelStore = ViewModelStore() override fun getViewModelStore(): ViewModelStore { return eventViewModelStore } private val mApplicationProvider: ViewModelProvider by lazy { ViewModelProvider( ApplicationScopeViewModelProvider, ViewModelProvider.AndroidViewModelFactory.getInstance(FlowBusInitializer.application) ) } fun <T : ViewModel> getApplicationScopeViewModel(modelClass: Class<T>): T { return mApplicationProvider[modelClass] } } object FlowBusInitializer { lateinit var application: Application / / in the Application initialization fun init (Application: Application) {FlowBusInitializer. Application = Application}}Copy the code
  • use
lifecycleScope.launch(Dispatchers.IO) {
    observeEvent {
        LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
    observeEvent(Dispatchers.IO) {
        LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }

    observeEvent(Dispatchers.Main) {
        LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    postValue(EventMessage(100))
    postValue(EventMessage(101), 1000)
    postValue(EventMessage(102, "bbb"), dispatcher = Dispatchers.IO)
    val event3 = EventMessage(103, "ccc")
    event3.put("key1", 123)
    event3.put("key2", "abc")
    postValue(event3, 2000, dispatcher = Dispatchers.Main)
}
Copy the code

reference

  • StateFlow and SharedFlow
  • Coroutines Flow best practice | application based on the Android developer summit
  • Migrate from LiveData to Kotlin data streams
  • Official recommendation Flow, LiveData: Then I go?
  • Google recommends Flow over LiveData. How sweet?
  • Who can replace LiveData-stateFlow or SharedFlow on Android?
  • Kotlin: Just a few lines of code? ! Write a FlowEventBus using SharedFlow

My name is Jinyang. If you want to learn more about jinyang, please pay attention to the wechat public number “Jinyang said” to receive my latest articles