preface

Since the project is using the MVVM architecture, using components such as DataStore will return a Flow

A: hello! I am not familiar with LiveData yet, so I started to use Flow. Since the official promotion is like this, of course it is the truth, so this chapter will talk about Flow.

In order to see firsthand and accurate information, I was forced to watch the video of Google official English introduction. Although I did not understand many of them, it was still easy to understand the pictures made by Google’s bigwigs. Many of the pictures in this chapter are captured videos.

background

In daily development, especially the APP side often have launched multiple requests at the same time and then to display content requirements, such as I get the title (factors) display first, then according to the factor to load the data, to get the data after the classification, processing after the show, this kind of complex business, if you are using Java original way of the callback, that will be a layer of a layer, You have to keep switching threads

Of course not. The previous article also said that using Kotlin coroutines makes it easy to switch threads and write non-blocking code in a blocking way, so this problem is solved.

There is also a problem with data processing. Of course, you will say that LiveData also has data processing API, such as Map and so on. What you say is very reasonable. I’ll tell you more later.

What’s wrong with LiveData?

What is LiveData? It is an observable data storage class based on the observer mode. At the same time, different from ordinary observers, it combines the Android life cycle and can sense the life cycle changes of the Android interface. This capability ensures that LiveData updates only application component observers that are in an active lifecycle state.

Therefore, LiveData is a simple and easy to use component. At the same time, because of its simple and easy to use, compared with the previous big brother RxJava, which is also the observer mode, it lacks many functions, so it will be more troublesome to deal with more complex scenes.

Due to the simple design of LiveData, it has the following shortcomings:

  • LiveData can only update data in the main thread.
  • There are not enough LiveData operators to handle complex data.

Even though I’m new to Jetpack, I remember that I can update data in child threads. Many of them have a postValue method?

Yes, but the postValue method also needs to switch to the main thread and update the data, so when I’m dealing with a complex business and I’m getting data from a child thread, I want to show it to the child thread first, and then I want to process it and then request it, so I don’t want to keep switching threads, and then I can use Flow.

Note: Since I have not really understood or used RxJava, I will not go into details about the comparison between Flow and RxJava, mainly because RxJava has a high threshold for entry, you need to deal with your own life cycle in Android, Flow also supports thread switching, and there is no performance waste with coroutine Scope.

The Flow profile,

Since there are few articles about Flow, I directly watched the official video introduction, not talking about the API and principle, but popularizing the wave according to the official documents first.

The problem

Now there are three time-consuming tasks, A, B, and C, performed in the coroutine. What is the difference in the result of receiving the three tasks with different return values?

Use the List

As shown, when foo is called, the thread is switched,

The obvious downside here is that you have to wait for all three tasks to complete, then put the results into a List and return them to the caller, or wait if a task takes a long time.

Sample code:

runBlocking{
    val list = foo()
    for (x in list) Log.i(TAG, "initData: $x")
}
Copy the code
suspend fun foo(): List<String> = buildList { Log.i(TAG, "foo: Delay (1000) add("A") delay(1000) add("B") delay(1000) add("C") log. I (TAG, "foo: end sending data ")}Copy the code

Look at the print again:

‘2021-08-25 14:34:45.307 15141-15141/: foo: start sending data

2021-08-25 14:34:48.309 15141-15141/: foo: ends sending data

2021-08-25 14:34:48.311 15141-15141/: initData: A

2021-08-25 14:34:48.312 15141-15141/: initData: B

2021-08-25 14:34:48.312 15141-15141/: initData: C

Here, the time-consuming task is executed for more than 3S, and then all the data is returned for the caller to process.

The use of the Channel

A Channel is a queue that can be used to connect coroutines and communicate with other coroutines. It is a queue that can be used to communicate with other coroutines.

This diagram shows that a Channel is a queue, but the queue is like a pipe, producing data and consuming data at the same time. Most importantly, it can work across coroutines.

So here’s a little example:

Val Channel = Channel<String>()Copy the code
// lifecyclescope.launch (dispatchers.io){while (true){log. I (TAG, "initData: receive: ${channel.receive()}") } }Copy the code
LifecycleScope. Launch {foo()}Copy the code
Suspend fun foo(){log. I (TAG, "foo: Send ("A") delay(1000) channel.send("B") delay(1000) channel.send("C") log. I (TAG, "foo: End sending data ")}Copy the code

The results can be imagined:

‘2021-08-25 14:58:42.236 16024-16024/: foo: start sending data

2021-08-25 14:58:43.240 16024-16062/: initData: receive: A

2021-08-25 14:58:44.247 16024-16061/: initData: receive: B

2021-08-25 14:58:45.252 16024-16024/: foo: ends sending data

2021-08-25 14:58:45.254 16024-16062/: initData: receive: C

You can see that by the time the first time-consuming task ends and is sent, the consumer is already working without waiting for all tasks to end.

The use of a Flow

A Flow can be called a Flow, which is similar to the design of a Channel. It is a producer and consumer model, producing and consuming data at the same time.

However, according to the official video, Flow is a cold Flow, and only when there are subscription objects will it start to generate data, that is, emit events. Therefore, before collect, no coroutines will be activated in Flow, thus causing no resource leakage. Take a quick look at the official screenshot:

Sample code:

Suspend fun foo(): Flow<String> = Flow {log. I (TAG, "foo: Delay (1000) emit("A") delay(1000) emit("B") delay(1000) emit("C") log. I (TAG, "foo: end sending data ")}Copy the code
// collect lifecyclescope.launch (dispatchers.io) {val flow = foo() flow.collect {log. I (TAG, "initData: $it")}} // Send data lifecycleScope. Launch {foo()}Copy the code

Predictably, the result must be the same as using Channel, printing:

2021-08-25 15:15:08.507 16419-16458/: foo: Start sending data

2021-08-25 15:15:09.515 16419-16457/: initData: A

2021-08-25 15:15:10.522 16419-16458/: initData: B

2021-08-25 15:15:11.529 16419-16457/: initData: C

2021-08-25 15:15:11.529 16419-16457/: foo: ends sending data

Here you will find the same as using Channel. The difference is that Flow gets the Flow reference first. Only when you go to collect, the upstream will emit data.

In order to deepen the impression and the difference, summarize:

The Flow is introduced

1.Flow is in the form of a sequence, which distinguishes lists. The following figure can easily see the difference.

2. What is cold Flow?

If you want to use hot Flow, you can use SharedFlow. For the difference between the two, please see the following figure:

(1) cold flow

Cold flow can ensure unnecessary memory waste, because only when collecting will trigger the running of the transmitting end coroutine code. If there are two collectors, the other Collector will also trigger the running of the transmitting end coroutine only when collecting, and both will run once, as shown in the figure:

Take a look at the code:

Var foo: Flow<String>? = nullCopy the code
Launch {foo()} // Start a thread in init to emit data lifecyclescope.launch {foo()} // Start both coroutines to foo, Then collect lifecyclescope.launch (dispatchers.io) {foo? .collect {log. I (TAG, "initData: A start to collect $it")}} // Here the coroutine is delayed for 2s before starting to collect lifecycleScope. Launch {delay(2000) foo? .collect {log. I (TAG, "initData: B start collecting $it")}}Copy the code

The foo() function looks like this:

Suspend fun foo(){foo = flow {log. I (TAG, "foo: start sending data ") delay(1000) log. I (TAG, "foo: start sending data ") I (TAG, "foo: start sending B") delay(1000) emit("B") log. I (TAG, "foo: start sending B") emit("B") log. I (TAG, "foo: start sending B") emit("B") log. I (TAG, "foo: start sending B") I (TAG, "foo: send C") emit("C") log. I (TAG, "foo: send C") log. I (TAG, "foo: send C")}}Copy the code

Then look at the print:

2021-08-30 11:49:13.862 29955-29992/com.wayeal. Yunapp I/zyh: foo: 2021-08-30 11:49:14.868 29955-29992/com.wayeal. Yunapp I/zyh: foo: Start sending A 2021-08-30 11:49:14.870 29955-29992/com.wayeal. Yunapp I/zyh: initData: 2021-08-30 11:49:14.870 29955-29992/com.wayeal. Yunapp I/zyh: foo: End Send A 2021-08-30 11:49:15.868 29955-29955/com.wayeal. Yunapp I/zyh: foo: 2021-08-30 11:49:15.874 29955-29992/com.wayeal. Yunapp I/zyh: foo: Start sending B 2021-08-30 11:49:15.875 29955-29992/com.wayeal. Yunapp I/zyh: initData: 2021-08-30 11:49:15.875 29955-29992/com.wayeal. Yunapp I/zyh: foo: End Send B 2021-08-30 11:49:16.870 29955-29955/com.wayeal. Yunapp I/zyh: foo: Start sending A 2021-08-30 11:49:16.871 29955-29955/com.wayeal. Yunapp I/zyh: initData: 2021-08-30 11:49:16.871 29955-29955/com.wayeal. Yunapp I/zyh: foo: End send A 2021-08-30 11:49:16.877 29955-29992/com.wayeal. Yunapp I/zyh: foo: Start sending C 2021-08-30 11:49:16.877 29955-29992/com.wayeal. Yunapp I/zyh: initData: C 2021-08-30 11:49:16.877 29955-29992/com.wayeal. Yunapp I/zyh: foo: End Send C 2021-08-30 11:49:16.877 29955-29992/com.wayeal. Yunapp I/zyh: foo: 2021-08-30 11:49:17.873 29955-29955/com.wayeal. Yunapp I/zyh: foo: Start sending B 2021-08-30 11:49:17.873 29955-29955/com.wayeal. Yunapp I/zyh: initData: 2021-08-30 11:49:17.873 29955-29955/com.wayeal. Yunapp I/zyh: foo: End Send B 2021-08-30 11:49:18.876 29955-29955/com.wayeal. Yunapp I/zyh: foo: Start sending C 2021-08-30 11:49:18.877 29955-29955/com.wayeal. Yunapp I/zyh: initData: 2021-08-30 11:49:18.877 29955-29955/com.wayeal. Yunapp I/zyh: foo: End send C 2021-08-30 11:49:18.877 29955-29955/com.wayeal. Yunapp I/zyh: foo: End send dataCopy the code

It is not hard to see from the above print:

  • A When the collector starts to call collect, the transmitting coroutine starts to work at 13.862 seconds
  • After waiting for 2s, when the collector OF B starts to call collect, the launch coroutine starts to work again, at 15.868, and the two are separated by more than 2s
  • Even though collector A has already started collecting, collector B starts running again

(2) heat flow

Heat flow is a one-to-many relationship. When there are multiple collectors, the transmitter sends a data, and each collector can receive the data. This is very similar to the LiveData observer model, and the data can be shared, so it is also called SharedFlow. And unlike cold flow, which runs the coroutine code at the transmitter end only when collect, hot flow starts executing when the object is created.

Also take a look at the sample code:

val _events = MutableSharedFlow<String>()
Copy the code
// Start coroutine, Launch (dispatchers.io) {_events.collect {SharedFlow lifecyclescopes.launch {foo1()} Log.i(TAG, "initData: LifecycleScope. Launch {delay(2000) _events.collect {log. I (TAG, "initData: B start collecting $it")}}Copy the code
Suspend fun foo1(){log. I (TAG, "foo: start sending data ") log. I (TAG, "foo: start sending data ") Start sending A") _events.emit("A") log. I (TAG, "foo: end sending A") delay(1000) log. I (TAG, "foo: end sending A") delay(1000) log. I (TAG, "foo: Start sending B") _events.emit("B") log. I (TAG, "foo: end sending B") delay(1000) log. I (TAG, "foo: end sending B") delay(1000) log. I (TAG, "foo: end sending B") delay(1000) log. I (TAG, "foo: C") _events.emit("C") log. I (TAG, "foo: send C") log. I (TAG, "foo: send data ")}Copy the code

Print data:

2021-08-30 14:04:53.404 8383-8383/com.wayeal. Yunapp I/zyh: foo: 2021-08-30 14:04:53.404 8383-8383/com.wayeal. Yunapp I/zyh: foo: Start sending A 2021-08-30 14:04:53.405 8383-8383/com.wayeal. Yunapp I/zyh: foo: A 2021-08-30 14:04:54.406 8383-8383/com.wayeal. Yunapp I/zyh: foo: Start sending B 2021-08-30 14:04:54.407 8383-8424/com.wayeal. Yunapp I/zyh: initData: A start collection B 2021-08-30 14:04:54.407 8383-8383/com.wayeal. Yunapp I/zyh: foo: End Send B 2021-08-30 14:04:55.415 8383-8383/com.wayeal. Yunapp I/zyh: foo: Start sending C 2021-08-30 14:04:55.421 8383-8426/com.wayeal. Yunapp I/zyh: initData: C 2021-08-30 14:04:55.423 8383-8383/com.wayeal. Yunapp I/zyh: initData: C 2021-08-30 14:04:55.425 8383-8383/com.wayeal. Yunapp I/zyh: foo: C 2021-08-30 14:04:55.426 8383-8383/com.wayeal. Yunapp I/zyh: foo: The data is sentCopy the code

It is not hard to see from the print above:

  • When heat flow is created, it starts emitting data
  • As collector A does not start collecting until the emitter sends A data, collector A cannot collect A either
  • After 2s, the B collector starts collecting, at which point it is forced to accept C because both A and B data have been missed and it does not run through the emission code again

Detailed analysis of Flow

From the previous code, we have certainly learned what Flow is, and the concepts of cold Flow and hot Flow. Now let’s take a look at how Flow is used and the common APIS based on the official source code.

Flow features and usage can be summarized in the figure above, and some additional points can be added:

1. Improved Flow operator

As mentioned above, Flow is a data Flow, and various operations can be performed on it in the middle. Therefore, in order to make better use of chain call, some APIS of Flow have been improved, such as delay at the time of each data transmission, delay at the start of each data transmission, and so on. Here is an official graph to show it:

2, back pressure

Back pressure is unavoidable in both producer and consumer models, and many strategies are used in RxJava to deal with it. However, Flow is designed to solve this problem. First, Flow transmits and collects data asynchronously, and second, it adds delay during transmission or collection to alleviate back pressure:

In terms of back pressure, this is very different from a Channel. A Channel’s strategy is to buffer, but a Flow has its own pressure mechanism. Because it is a Cold event source, the event source will not actively emit events without a consumer.

Collect method and flow construction method is suspend, so both can be delayed, so that if a certain end is not ready, it can also be delayed to pressure, as can be seen in the figure above, in short, it is not done to deal with back pressure.

3. Buffer operator

According to the previous summary figure, Flow is A sequential queue, that is, start collect, send A, process A, then send B, process B. Here, if there are time-consuming operations at the collection end, the whole time will be very long, as shown in the figure

Code examples:

// Collect, but I also need time to consume, So delay 1s lifecyclescope.launch (dispatchers.io) {val flow = foo() flow.collect {log. I (TAG, "initData: $it") delay(1000)}} lifecycleScope. Launch {foo()}Copy the code
Suspend fun foo(): Flow<String> = Flow {log. I (TAG, "foo: start sending data ") delay(1000) log. I (TAG, "foo: start sending data ") I (TAG, "foo: start sending B") delay(1000) emit("B") log. I (TAG, "foo: start sending B") emit("B") log. I (TAG, "foo: start sending B") emit("B") log. I (TAG, "foo: start sending B") I (TAG, "foo: send C") emit("C") log. I (TAG, "foo: send C") log. I (TAG, "foo: send C")}Copy the code

You can see the final execution print:

2021-08-25 15:37:44.640 16997-17035/: foo: start sending A 2021-08-25 15:37:44.640 16997-17035/: initData: Consume A 2021-08-25 15:37:45.641 16997-17036/: foo: End send A 2021-08-25 15:37:46.643 16997-17035/: foo: end send A 2021-08-25 15:37:46.643 16997-17035/: foo: Start sending B 2021-08-25 15:37:46.643 16997-17035/: initData: consume B 2021-08-25 15:37:47.643 16997-17036/: foo: End Send B 2021-08-25 15:37:48.645 16997-17035/: foo: Start send C 2021-08-25 15:37:48.645 16997-17035/: initData: C 2021-08-25 15:37:49.648 16997-17036/: foo: end sending C 2021-08-25 15:37:49.649 16997-17036/: foo: end sending dataCopy the code

As you can see, this operation took 6s in total. How to improve it is to use buffer:

SharedFlow detailed analysis

Cold Flow is a cool Flow, but it is often SharedFlow, or hot Flow. For example, in the MVVM architecture of Android, it is necessary to use data-driven to complete it. In this case, LiveData can be replaced with SharedFlow(of course, StateFlow will be mentioned later). For a flow, there will be multiple subscribers. So that multiple subscribers can make changes according to data changes.

Again, take a quick look at the summary chart:

Let’s look at a wave.

(1) Constructor

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
Copy the code
  • Replay: when a new subscriber collects, send several data that has already been sent to it. The default value is 0, that is, by default, the new subscriber will not get the data before the subscription.

-extraBufferCapacity: extraBufferCapacity: specifies the number of data that can be cached without replay. The default value is 0. -onBufferOverflow: indicates the cache policy. That is, how to handle the overflow when the buffer is full. The default value is suspend.

(2) ShareIn function

If you want to use SharedFlow, it is better to use MutableSharedFlow and emit data from it. However, if you need to return a Flow, you need to use ShareIn to convert the Flow into SharedFlow. The ShareIn function is an extension of Flow.

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>
Copy the code
  • Scope is the coroutine scope in which the share starts. As mentioned earlier, SharedFlow is much like LiveData in that the collector is the observer, so in order to control the scope, you need to pass a coroutine scope in which the observer performs the operation.
  • Replay, this is the amount that needs to be repassed to a new observer when there is a new observer subscription. z
  • Started, the policy that controls the start and end of the share. Here are three strategies:

(3) Upstream flow operator of ShareIn function

In the previous section we talked about some operators that can be applied here, such as I end upstream, exceptions, etc

onCompletion

val flow = loginRepository.getLoginName()
flow.onCompletion {
        cause -> if (cause == null) Log.i(TAG, "completed: ")
}.shareIn(viewModelScope, SharingStarted.Eagerly)
    .collect {
        userName.value = it
    }
Copy the code

retry

You can handle certain exceptions, such as reconnection when IO exceptions occur.

val flow = loginRepository.getLoginName()
flow.onCompletion {
        cause -> if (cause == null) Log.i(TAG, "completed: ")
}.retry(5000) {
    val shallRetry = it is IOException
            if (shallRetry) delay(1000)
            shallRetry
}.shareIn(viewModelScope, SharingStarted.Eagerly)
    .collect {
        userName.value = it
    }
Copy the code

OnStart can do something before upstream data starts.

val flow = loginRepository.getLoginName()
flow.onStart { 
    emit("start")
}
    .onCompletion {
        cause -> if (cause == null) Log.i(TAG, "completed: ")
}.retry(5000) {
    val shallRetry = it is IOException
            if (shallRetry) delay(1000)
            shallRetry
}.shareIn(viewModelScope, SharingStarted.Eagerly)
    .collect {
        userName.value = it
    }
Copy the code

Detailed analysis of StateFlow

Finally, StateFlow has arrived. The official hope is to replace LiveData with StateFlow. So how does it replace LiveData?

(1) Features

  • StateFlow is a subclass of SharedFlow. As mentioned above, StateFlow is a heat flow that can be observed by multiple observers and the scope and condition of its disappearance can be set.
  • StateFlow only updates the latest data, that is, it is a SharedFlow with replay 0.
  • StateFlow is similar to LiveData in that it has a value to hold its value and can be obtained or set using this property.

(2) Use

  • Using MutableStateFlow is just like LiveData, but it requires a default value.
  • You can also use the stateIn function to convert a Flow to StateFlow.
val result = userId.mapLatest { newUserId ->
    repository.observeItem(newUserId)
}.stateIn(
    scope = viewModelScope,
    started = WhileSubscribed(5000),
    initialValue = Result.Loading
)
Copy the code

From this most basic use, we can see the following information:

  1. Its scope is viewModelScope, and when the viewModelScope ends, the stream stops.
  2. After all observers have been removed, after 5s, the flow stops in order not to waste performance.
  3. There’s going to be a default value, which is this initivalValue.

(3) Observation

LiveData updates data only when the UI is active. This StateFlow does not pass lifecycleOwner when collected.

First, the observed operation is performed in the coroutine. The scope of the coroutine is lifecycleScope. This is not bad, but should launch directly or launchWhenStarted?

We can see from this diagram that if we use launch directly, there may be a data update during onStop execution. My View can’t be updated at all, so it will cause an error. Here we want to achieve the same effect as LiveData and update when the interface is active. So start the coroutine here using launchWhenStarted.

WhileSubscribed if no observer is present, repeatOnLifecycle is stopped after 5 seconds. If WhileSubscribed, repeatOnLifecycle is required.

The purpose of this repeatOnLifecycle is to start the coroutine when a specific state is satisfied and to stop the coroutine when the life cycle exits that state. For example, if my APP goes back to the background, if it exceeds 5S, it will no longer subscribe, the coroutine terminates, and the upstream launch logic will also stop, improving performance. When the APP switches to the foreground again, onStart will be executed. At this point, the observer will appear and the upstream logic will start again, so the best way to write it is as follows:

onCreateView(...) { viewLifecycleOwner.lifecycleScope.launch { viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) { myViewModel.myUiState.collect { ... }}}}Copy the code

conclusion

After reading about Flow, we can see that it not only has powerful operators like RxJava, but also can be used with JetPack, which is much easier to handle complex data than LiveData.