This chapter, the preface

This article is an extension of the Kotlin coroutines series. Those interested in Kotlin coroutines can read the following links.

KotlinCoroutine basic and principle series

  • Basic usage of Kotlin coroutines
  • Kotlin coroutine introduction to Android version in detail (ii) -> Kotlin coroutine key knowledge points preliminary explanation
  • Kotlin coroutine exception handling
  • Use Kotlin coroutine to develop Android applications
  • Network request encapsulation for Kotlin coroutines
  • Kotlin coroutine theory (1)
  • Kotlin coroutine theory (2)
  • [Kotlin coroutine introduction for Android version in detail (8) -> In-depth Kotlin coroutine principle (3)]
  • [Kotlin coroutine introduction for Android (9) -> In-depth Kotlin coroutine principle (4)]

FlowA series of

  • Flow Usage of Kotlin Coroutines (1)

Extend the series

  • Encapsulating DataBinding saves you thousands of lines of code
  • A ViewModel wrapper for everyday use

The author is just an ordinary developer, the design is not necessarily reasonable, we can absorb the essence of the article, to dross.

Kotlin coroutinesFlowThe use of

Personally, I don’t think there is much need to explain the chapter of Flow, because it mainly combines some coroutine knowledge with responsive Flow. These are things we’ve learned using RxJava and learning about coroutines. But recently I found that many people are still asking for some knowledge about Flow.

In line with the principle that it is better to teach a man to fish than to teach a man to fish, this chapter will not only explain how to use it, but also explain some implementation principles. We will explain the usage and implementation principle of Flow synchronously. The length may be too long, so you can skip to see it as needed.

Thank you to each and every one of you. If it wasn’t for your daily reminders, this article might not exist.

** would like to express special thanks to the group of friends, thank you for every time in the water blowing fish inadvertently hint me: @stupid white whore@ flowers fall with @ailurusfulgens @stage before listening to the rain @teenager @prime meridian @do you know who I am @Bessel curve @straight line @bonfire @a slanting classics @Ming next a nickname ~ @null @jerry J @want to ** and so on 365 group friends

Asynchronous flow

By studying coroutines, we know that suspended functions can asynchronously return a single result value. Such as:

fun test(a){
    GlobalScope.launch {
        val withStr = withContext(Dispatchers.Default){
            "a"
        }
        val awaitStr = async {
            "b"
        }
        val list = simple()
        Log.d("test"."withStr :$withStr")
        Log.d("test"."awaitStr :${awaitStr.await()}")
        Log.d("test"."list :$list  ")}}Copy the code
D/test: withStr :a
D/test: awaitStr :b
D/test: list :[1, 2, 3]  
Copy the code

Even if we use a List in a function to return a collection result, we can only think of it as returning a result, except that the result is of type List.

So if we want to return multiple calculated values asynchronously through reactive programming in the same way that we do with RxJava in coroutines. Someone might want to use the Sequence Sequence to operate.

public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>. () - >Unit): Sequence<T> = Sequence { iterator(block) }
Copy the code

It is possible to use Sequence Sequence because Sequence itself accepts a suspend function suspend:

private fun simple(a): Sequence<Int> = sequence {
    for (i in 1.3.) {
        Thread.sleep(100)
        yield(i)
    }
}

fun test(a) {
    simple().forEach { value ->
        Log.d(TAG, "value :${value}")}}Copy the code
D/carman: value :1
D/carman: value :2
D/carman: value :3
Copy the code

However, we cannot use the delay suspension function to delay, only thread.sleep. This is because sequence receives a SequenceScope extension function, and the RestrictsSuspension annotation is used on the SequenceScope class. The classes and interfaces of this annotation tag are limited when used as receivers for extending suspended functions. These suspend extensions can only call other members or extension suspend functions on this particular sink, and they cannot call arbitrary suspend functions.

@RestrictsSuspension
public abstract class SequenceScope<in T> internal constructor() {
    //....
}
Copy the code

Without this limitation, you might have the side effect of switching threads when using the next element. Similarly, if we wanted to specify the thread in which the sequence was created by specifying the scheduler, it would be equally impossible, or even impossible, to set the coroutine context.

Since Sequence Sequence has so many limitations, it is necessary to create something new to implement it, and this is when Flow comes into being.

Flow is different from RxJava

For those familiar with Reactive Streams or Reactive frameworks like RxJava. The Flow design might look familiar, especially since the operators all look pretty much the same.

Flow is also inspired by responsive flows and their various implementations. But the main goal of Flow is to have as simple a design as possible, with more kotlin coroutine friendly support. See Reactive Streams and Kotlin Flows for the story of Flow.

Although different, a Flow is still a responsive Flow conceptually. As with RxJava, there are still hot and cold flows. Flow is also simpler than RxJava’s thread switching.

Related responsive modules (e.g. : Kotlinx-coroutines-reactive is used for Reactive Streams, kotlinx-coroutines-rx2/kotlinx-coroutines-rx3 is used for RxJava2/RxJava3, etc.). These modules allow transitions between flows and other implementations.

Flow itself is an interface in which a suspend function collect is defined and receives a FlowCollector object. There is a suspended function emit in the FlowCollector interface. So how do they implement reactive flows?

public interface Flow<out T> {
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}
Copy the code

Create a cold data streamFlow

Now we use Flow to replace the previous implementation using Sequence Sequence:

throughflow {... }Function creates

fun test(a) {
    lifecycleScope.launch {
        flow {
            for (i in 1.3.) {
                delay(100)
                emit(i)
            }
        }.collect { value -> Log.d(TAG, "value :${value}")}}}Copy the code

Notice the difference between the code using Flow and the previous example. Here we use flow {… The} function creates a cold data Flow, emits data through emit, and collects the data through collect. But because collect is a suspend function, the call to the suspend function must be in another suspend function or coroutine scope. This is where we need to use coroutines to do it.

Let’s go on to see how they are implemented, source code:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>. () - >Unit): Flow<T> = SafeFlow(block)
Copy the code

Although we use flow {… } function, but is actually a Flow object created through the SafeFlow class. SafeFlow inherits from AbstractFlow. AbstractFlow, on the other hand, inherits both Flow and CancellableFlow. This means that the cold data Flow we created can be cancelled.

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }

    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
Copy the code

You can see that even though we called Collect, the command was actually executed using the collectSafely function. Call SafeCollector to execute the block higher-order function argument of COLLECT. It simply executes SafeCollector’s releaseIntercepted function when an exception occurs. Let’s move on to the implementation of SafeCollector.

internal actual class SafeCollector<T> actual constructor(
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
    / /...
    override val context: CoroutineContext
        get() = completion? .context ? : EmptyCoroutineContextoverride fun invokeSuspend(result: Result<Any? >): Any { result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } completion? .resumeWith(resultas Result<Unit>)
        return COROUTINE_SUSPENDED
    }
    
    public actual override fun releaseIntercepted(a) {
        super.releaseIntercepted()
    }
    
    override suspend fun emit(value: T) {
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                emit(uCont, value)
            } catch (e: Throwable) {
                lastEmissionContext = DownstreamExceptionElement(e)
                throw e
            }
        }
    }
    
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        / /...
        return emitFun(collector asFlowCollector<Any? >, value,this as Continuation<Unit>)}}Copy the code

If you have read the coroutine principle here, you should be familiar with it. It is the execution, scheduling and recovery process of coroutines. I won’t repeat it here. If you need it, you can see it by yourself. Portal -> Coroutine Principle 1 Portal -> Coroutine Principle 2.

By extending the functionasFlowcreate

Create a Flow {… } function, we can also create using asFlow, as follows:

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().collect { value -> Log.d(TAG, "value :${value}")}}}Copy the code

AsFlow calls flow {… }, there are many kinds of extension functions of asFlow, we are just examples here:

public fun <T> Array<T>.asFlow(a): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}
//....
public fun IntRange.asFlow(a): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}
Copy the code

throughflowOfFunction creates

FlowOf supports only a single value or variable value. The same final call to flow {… }.

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}
public fun <T> flowOf(value: T): Flow<T> = flow {
    emit(value)
}
Copy the code

Such as:

fun test(a) {
    lifecycleScope.launch {
        flowOf(1.2.2.3).collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code

As mentioned above, it is possible to cancel through Flow, but Flow does not provide cancellation operation, so how can we cancel the Flow execution?

Actually, it is very simple. We know that the execution of Flow depends on Collect, and it must be called in the coroutine, so the cancellation of Flow mainly depends on the state of the coroutine where Collect is located. So to cancel Flow, you just need to cancel the coroutine in which it resides.

fun test(a) {
    val job = lifecycleScope.launch {
        flow {
            for (i in 1.3.) {
                delay(100)
                emit(i)
            }
        }.collect { value -> Log.d(TAG, "value :${value}") }
    }
    job.cancel()
}
Copy the code

Do you suddenly feel that Flow is not as difficult as you expected. It’s just a further encapsulation of the coroutine. Here’s the point. In order to ensure the consistency of flow context, it is forbidden to have thread scheduling in flow code block.

fun test(a) {
     lifecycleScope.launch {
        flow {
            for (i in 1.3.) {
                delay(100)
                if (i ==2 ){
                    withContext(Dispatchers.IO){
                        / / SAO operation
                        emit(i)
                    }
                }else{
                    emit(i)
                }
            }
        }.collect { value -> Log.d(TAG, "value :${value}")}}}Copy the code

The above code compiled at compile time period is not prompt you call the wrong, but at the time of execution throws a Java lang. An IllegalStateException: Flow invariant is violated. What if I want to switch threads during execution

FlowThread switch

If we want to switch threads when using Flow, we need to use flowOn, the Flow extension function.

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}
Copy the code

FlowOn changes the context in which this flow is executed to the specified context. The operator is composable. Note that flowOn only affects operators that have no previous context of their own. I don’t know how to understand this. The default state flow is executed on which threads:

fun test(a) {
    lifecycleScope.launch {
        flow {
            for (i in 1.3.) {
                Log.d(TAG, "flow :${ currentCoroutineContext()}")
                delay(100)
                emit(i)
            }
        }.collect { value ->
                Log.d(TAG, "collect:${ currentCoroutineContext()} value :${value}")}}}Copy the code

By default, the launch of lifecycleScope is executed by the main thread. According to the execution principle of the coroutine, we can confirm that all the execution operations in the above example are executed by the main thread:

D/carman: flow :[StandaloneCoroutine{Active}@78b0fe4, Dispatchers.Main.immediate]
D/carman: collect:[StandaloneCoroutine{Active}@78b0fe4, Dispatchers.Main.immediate] value :1
D/carman: flow :[StandaloneCoroutine{Active}@78b0fe4, Dispatchers.Main.immediate]
D/carman: collect:[StandaloneCoroutine{Active}@78b0fe4, Dispatchers.Main.immediate] value :2
D/carman: flow :[StandaloneCoroutine{Active}@78b0fe4, Dispatchers.Main.immediate]
D/carman: collect:[StandaloneCoroutine{Active}@78b0fe4, Dispatchers.Main.immediate] value :3
Copy the code

Let’s use flowOn to switch the thread and see what happens.

fun test(a) {
    lifecycleScope.launch {
        flow {
            for (i in 1.3.) {
                Log.d(TAG, "flow :${ currentCoroutineContext()}")
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)
            .collect { value ->
                Log.d(TAG, "collect:${ currentCoroutineContext()} value :${value}")}}}Copy the code
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: collect:[ScopeCoroutine{Active}@1e865fe, Dispatchers.Main.immediate] value :1
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: collect:[ScopeCoroutine{Active}@1e865fe, Dispatchers.Main.immediate] value :2
D/carman: collect:[ScopeCoroutine{Active}@1e865fe, Dispatchers.Main.immediate] value :3
Copy the code

You can see that execution in the Flow block has been switched to another thread. But the code in Collect still executes on the main thread. So what happens if we add another one?

fun test(a) {
    lifecycleScope.launch {
        flow {
            for (i in 1.3.) {
                Log.d(TAG, "flow :${ currentCoroutineContext()}")
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)
            .map {
                Log.d(TAG, "map :${ currentCoroutineContext()}")
                it
            }.flowOn(Dispatchers.Default)
            .collect { value ->
                Log.d(TAG, "collect:${ currentCoroutineContext()} value :${value}")}}}Copy the code
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: map :[ScopeCoroutine{Active}@cc43a14, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@8b702bd, Dispatchers.Main.immediate] value :1
D/carman: flow :[ProducerCoroutine{Active}@78b0fe4, Dispatchers.IO]
D/carman: map :[ScopeCoroutine{Active}@cc43a14, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@8b702bd, Dispatchers.Main.immediate] value :2
D/carman: map :[ScopeCoroutine{Active}@cc43a14, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@8b702bd, Dispatchers.Main.immediate] value :3
Copy the code

Let’s skip the map operator for a moment and focus on what we’re looking at here. You can see the flow{… } is executed on the IO thread, and the call to flowOn(dispatchers.default) does not change the flow{… } only changes the map execution thread without context so that the code block in the map executes in the Default thread. The code in Collect still executes on the main thread.

If we remove flowOn(dispatchers.io) from the list here, we can see that flow{… } and map code blocks will be executed in the Default thread.

D/carman: flow :[ProducerCoroutine{Active}@3656c4d, Dispatchers.Default]
D/carman: map :[ProducerCoroutine{Active}@3656c4d, Dispatchers.Default]
D/carman: flow :[ProducerCoroutine{Active}@3656c4d, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@840cc75, Dispatchers.Main.immediate] value :1
D/carman: map :[ProducerCoroutine{Active}@3656c4d, Dispatchers.Default]
D/carman: flow :[ProducerCoroutine{Active}@3656c4d, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@840cc75, Dispatchers.Main.immediate] value :2
D/carman: map :[ProducerCoroutine{Active}@3656c4d, Dispatchers.Default]
D/carman: collect:[ScopeCoroutine{Active}@840cc75, Dispatchers.Main.immediate] value :3
Copy the code

Through the comparison of the four logs, we can make some conclusions:

  • flowOnYou can change the context in which this flow is executed to the specified context.
  • flowOnCan be used in combination.
  • flowOnOnly operators that do not have their own context preceding them are affected. Operators that already have a context are not followedflowOnImpact.
  • No matterflowOnHow to switch threads,collectAlways runs on the coroutine scheduler that calls it.

FlowThe common operator of

The map operator for Flow is mentioned above, and actually collect is also an operator. They just have different responsibilities. According to the official statement, combined with my own sense of use, the author divides Flow operators into five main types (unofficial) :

  • Overpass operator: or calledProcess operatorIs used to distinguish between a certain stage of a process execution. Such as:onStart/onEach/onCompletion. The transition operator applies to the upstream flow and returns to the downstream flow. These operators are also cold operators, just like streams. Such operators are not themselves suspend functions. It runs very fast and returns the definition of the new transformation flow.
  • Exception operator: Used to catch exceptions in the processing stream. Such as:catch.onErrorCollect(Obsolete, recommendedcatch).
  • Conversion operator: Mainly performs data conversion operations. Such as:transform/map/filter/flatMapConcatEtc.
  • Limiting operator: Execution of a stream is cancelled when it hits the corresponding limit. Such as:drop/takeEtc.
  • Terminal operator: is used to start the stream collection suspend function on a stream.collectIs the most basic end operator, but there are other end operators that are more convenient to use. Such as:toList,toSet,first,single,reduce,fold, etc.

Process operator

  • onStart: is called before the upstream stream starts.
  • onEach: called before each value of the upstream stream is emitted downstream.
  • onCompletion: called after the process completes or cancels, passing the cancellation exception or failure as the cause parameter of the operation.

It is important to note that when onStart is used with SharedFlow, there is no guarantee that upstream stream emissions occurring within the onStart operation or immediately after the onStart operation will be collected. We’ll cover this in the heat data flow section of a later article.

fun test(a) {
    lifecycleScope.launch {
        flow {
            Log.d(TAG, "flow")
            emit(1)
        }.onStart {
            Log.d(TAG, "onStart ")
        }.onEach {
            Log.d(TAG, "onEach :${it}")
        }.onCompletion {
            Log.d(TAG, "onCompletion")
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: onStart 
D/carman: flow
D/carman: onEach :1
D/carman: collect :1
D/carman: onCompletion
Copy the code

OnStart ->flow{… } – > onEach – > collect – > onCompletion.

Exception operator

As mentioned above, there may be exceptions to Flow execution. Let’s change the code and throw an exception in onEach. Take a look at the log message output when the code fails:

fun test(a) {
    lifecycleScope.launch {
        flow {
            Log.d(TAG, "flow")
            emit(1)
        }.onStart {
            Log.d(TAG, "onStart ")
        }.onEach {
            Log.d(TAG, "onEach :${it}")
            throw NullPointerException(Null pointer)
        }.onCompletion { cause ->
            Log.d(TAG, "onCompletion catch $cause")
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
 D/carman: onStart 
 D/carman: flow
 D/carman: onEach 1
 D/carman: onCompletion catchJava. Lang. NullPointerException: null pointer Process: com. Example. Myapplication, PID:31145Java. Lang. NullPointerException: null pointer... .Copy the code

As you can see, after an exception is thrown in onEach, the exception causes the coroutine to exit, so Collect is not executed, but onCompletion is executed. So what’s going on here?

Shouldn’t onCompletion be executed after Collect? Why onCompletion was executed instead of Collect? At this point we need to look at the source code:

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>. (cause: Throwable?). ->Unit
): Flow<T> = unsafeFlow {
    try {
        collect(this)}catch (e: Throwable) {
        ThrowingCollector(e).invokeSafely(action, e)
        throw e
    }
    val sc = SafeCollector(this, currentCoroutineContext())
    try {
        sc.action(null)}finally {
        sc.releaseIntercepted()
    }
}
Copy the code

You can see that in onCompletion, the Collect method is captured in the try/catch block, and then in the catch branch. The code in onCompletion is executed through invokeSafely and then rethrown. Now that onCompletion throws an exception again, what is the proper way to handle it?

In the coroutine Basics article, we talked about handling exceptions by using try/catch blocks. So let’s see how to catch an exception using try/catch.

fun test(a) {
    lifecycleScope.launch {
        try {
            flow {
                Log.d(TAG, "flow")
                emit(1)
                throw NullPointerException(Null pointer)
            }.onStart {
                Log.d(TAG, "onStart ")
            }.onEach {
                Log.d(TAG, "onEach ")
            }.onCompletion {
                Log.d(TAG, "onCompletion")
            }.collect { value ->
                Log.d(TAG, "collect :${value}")}}catch (e: Exception) {
            Log.d(TAG, "Exception : $e ")}}}Copy the code

We could also use a try/catch to handle exceptions, but that doesn’t look elegant. If an exception occurs, the execution cannot continue. Even if we are in flow {… } It also makes no sense to use try/catch internally and then emit via emit. Because it’s against abnormal transparency.

At this point we need to use the catch operator to preserve the transparency of this exception and allow its exception handling to be encapsulated. The block of code for the catch operator can analyze an exception and react to it in different ways depending on which exception is caught:

  • You can usethrowRethrow the exception.
  • Can be found incatchCode block throughemitThe exception is converted to a new value and emitted.
  • You can ignore the exception, log it out, or handle it with some other code.

Now let’s modify the code to remove the try/catch block. After the exception is caught by the catch operator, a new value is emitted from the EMIT.

fun test(a) {
    lifecycleScope.launch {
        flow {
            Log.d(TAG, "flow")
            emit(1)
            throw NullPointerException(Null pointer)
        }.onStart {
            Log.d(TAG, "onStart ")
        }.onEach {
            Log.d(TAG, "onEach ")}.catch { cause ->
            Log.d(TAG, "catch $cause")
            emit(2)
        }.onCompletion { cause ->
            Log.d(TAG, "onCompletion catch $cause")
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: onStart 
D/carman: flow
D/carman: onEach 1
D/carman: catchJava. Lang. NullPointerException: null pointer D/carman: collect:2
D/carman: onCompletion catch null
Copy the code

It can be seen that after we catch the exception through the catch operator, collect can only collect the values emitted upstream. The value 2 emitted via emit in the catch operator is also collected normally. And we don’t collect exception information on onCompletion.

What if we changed the code again and added a map operator after the catch operator to throw a new exception?

fun test(a) {
    lifecycleScope.launch {
        flow {
            Log.d(TAG, "flow")
            emit(1)
        }.onStart {
            Log.d(TAG, "onStart ")
        }.onEach {
            Log.d(TAG, "onEach $it")
            throw NullPointerException(Null pointer)}.catch { cause ->
            Log.d(TAG, "catch $cause")
            emit(2)
        }.map {
            Log.d(TAG, "map")
            throw NullPointerException("New exception")
            it
        }.onCompletion { cause ->
            Log.d(TAG, "onCompletion2 catch $cause")
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: onStart 
D/carman: flow
D/carman: onEach 1
D/carman: catchJava. Lang. NullPointerException: null pointer D/carman: map D/carman: onCompletion2catchJava. Lang. NullPointerException: new abnormal Process: com. Example. Myapplication, PID:32168Java. Lang. NullPointerException: new exception... .Copy the code

The program crashed. So what’s going on here? This is because each operator is only for its upstream stream, and if an exception occurs in the downstream stream, we need to add another catch operator to catch it properly.

But if our exception is in collect terminal operator, this time we can only through the try/catch the whole Flow or data Flow, or by coroutines in the context of CoroutineExceptionHandler to handle (you can try it on their own).

Conversion operator

Of the stream conversion operators, the most common is called transform. It can be used to mimic simple transformations. There are also things like Map, Fliter, Zip, Combine, flatMapConcat, flatMapMerge, flatMapLatest, and so on

The transform operator

The transform operator takes any value any number of times. All other conversion operators extend from transform. For example, you can send a string and trace the response before an asynchronous request runs for a long time.

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().transform {
            emit(it)
            emit("transform $it")
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :1
D/carman: collect :transform 1
D/carman: collect :2
D/carman: collect :transform 2
D/carman: collect :3
D/carman: collect :transform 3
Copy the code

The map operators

We use the map operator to convert data, including the type of the transmitted data:

fun test(a) {
    lifecycleScope.launch {
        flow {
            emit(1)
        }.map {
            Log.d(TAG, "First conversion")
            it * 5
        }.map {
            Log.d(TAG, "Value after the first conversion:$it")
            "map $it"
        }.collect { value ->
            Log.d(TAG, "Final converted value:${value}")}}}Copy the code
D/ CARman: first conversion D/ Carman: value after first conversion:5D/ Carman: final converted value :map5
Copy the code

You can see that we multiply in the first map operator and cast in the second map operator. Finally, we received the data after multiple conversion processing. The advantage of this is that we can ensure that we have a single responsibility for each flow and only perform one operation at a time, rather than having all the processes processed together and delivered later.

Map also has the same operator, mapNotNull, which filters out null values and emits only non-null values.

fun test(a) {
    val flow = flowOf("one"."two"."three".null."four")
    lifecycleScope.launch {
        flow.mapNotNull {
            it
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :one
D/carman: collect :two
D/carman: collect :three
D/carman: collect :four
Copy the code

fliterThe operator

As the name implies, the fliter operator essentially does a filter on the data, returning a stream containing only the values of the original stream that matches a given stream.

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().filter {
            it < 2
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :1
Copy the code

Filter and there are many same type operators, such as: filterNot/filterIsInstance/filterNotNull.

The filterNot effect is exactly the same as that of Fliter, which takes the opposite value of the judgment condition.

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().filterNot { it < 2 }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :2
D/carman: collect :3
Copy the code

FilterIsInstance filters a value of a specified type

fun test(a) {
    val flow = flowOf("one"."two"."three".1."four")
    lifecycleScope.launch {
        flow.filterIsInstance<String>().collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :one
D/carman: collect :two
D/carman: collect :three
D/carman: collect :four
Copy the code

FilterNotNull is similar to mapNotNull in that it filters out null values and emits only non-null values.

fun test(a) {
    val flow = flowOf("one"."two"."three".null."four")
    lifecycleScope.launch {
        flow.filterNotNull().collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :one
D/carman: collect :two
D/carman: collect :three
D/carman: collect :four
Copy the code

zipThe operator

The zip operator is used to combine related values in two streams, like the ZIP function in RxJava:

fun test(a) {
    val flow1 = (1.3.).asFlow()
    val flow2 = flowOf("one"."two"."three")
    lifecycleScope.launch {
        flow2.zip(flow1) { value1, value2 ->
            "$value1 :$value2"
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :1 :one
D/carman: collect :2 :two
D/carman: collect :3 :three
Copy the code

CombineThe operator

When a flow represents the latest value of a variable or operation, it may need to perform a calculation, depending on the latest value of the corresponding flow, and recalculating each time the upstream flow produces a value.

So how do I understand this? Let me give you an example. For example, if the numbers in the previous example were updated every 100 milliseconds, but the string was updated every 200 milliseconds, and then the zip operation was used to match and merge them, the same result would still be produced, even though the result was printed every 200 milliseconds, for example:

fun test(a) {
    val flow1 = (1.3.).asFlow().onEach { delay(100)}val flow2 = flowOf("one"."two"."three").onEach { delay(200) }
    lifecycleScope.launch {
        flow2.zip(flow1) { value1, value2 ->
            "$value1 :$value2"
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
 D/carman: collect :one :1
 D/carman: collect :two :2
 D/carman: collect :three :3
Copy the code

But when we use the Combine operator to replace zip, we get completely different output. Since emission is faster in FLOW1 than flow2, recalculation is performed every time the upstream flow generates a value.

fun test(a) {
    val flow1 = (1.3.).asFlow().onEach { delay(100)}val flow2 = flowOf("one"."two"."three").onEach { delay(200) }
    lifecycleScope.launch {
        flow2.combine(flow1) { value1, value2 ->
            "$value1 :$value2"
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :one :1
D/carman: collect :one :2
D/carman: collect :two :2
D/carman: collect :two :3
D/carman: collect :three :3
Copy the code

flatMapConcatThe operator

FlatMapConcat is mainly used for advection. So what is advection? Let’s look at an example:

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().map {
            requestFlow(it)// Simulate a network request
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")}Copy the code
D/carman: collect :kotlinx.coroutines.flow.SafeFlow@51a496c
D/carman: collect :kotlinx.coroutines.flow.SafeFlow@4185b35
D/carman: collect :kotlinx.coroutines.flow.SafeFlow@a272bca
Copy the code

Suppose that when we make a network request in map, we get a Flow Flow

> that contains a Flow. At this point, we need to flatten it into a single Flow for the next step. So we need to deal with it through the flattening operator.

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().onEach { delay(100) }
            .flatMapConcat { requestFlow(it) }
            .collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :1: First
D/carman: collect :1: Second
D/carman: collect :2: First
D/carman: collect :2: Second
D/carman: collect :3: First
D/carman: collect :3: Second
Copy the code

In the output log we can clearly see the sequential execution of flatMapConcat.

flatMapMergeThe operator

FlatMapMerge is another flattening mode, which collects all incoming streams concurrently and merges their values into a single stream in order to emit values as quickly as possible.

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().onEach { delay(100) }
            .flatMapMerge  { requestFlow(it) }
            .collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :1: First
D/carman: collect :2: First
D/carman: collect :3: First
D/carman: collect :1: Second
D/carman: collect :2: Second
D/carman: collect :3: Second
Copy the code

The output log clearly shows the concurrent nature of flatMapMerge.

flatMapLatestThe operator

FlatMapLatest is also advection, though not quite the same as the above two. It uncollects the previous stream as soon as it emits a new stream.

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().onEach { delay(100) }
            .flatMapLatest   { requestFlow(it) }
            .collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :1: First
D/carman: collect :2: First
D/carman: collect :3: First
D/carman: collect :3: Second
Copy the code

Note that flatMapLatest cancles all code in the block when a new value arrives ({requestFlow(it)} in this example). This won’t make a difference in this particular example, because the invocation of The requestFlow itself is fast and no hang occurs. So it won’t be canceled. However, if we were to call it in a block, a suspend function such as delay would be displayed.

Limiting operator

takeThe operator

The take operator returns the stream containing the first count element. Cancels execution by throwing an exception if the number of launches is greater than or equal to the value of count.

public fun <T> Flow<T>.take(count: Int): Flow<T> {
    require(count > 0) { "Requested element count $count should be positive" }
    return flow {
        var consumed = 0
        try {
            collect { value ->
                if (++consumed < count) {
                    return@collect emit(value)
                } else {
                    return@collect emitAbort(value)
                }
            }
        } catch (e: AbortFlowException) {
            e.checkOwnership(owner = this)}}}private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
    emit(value)
    throw AbortFlowException(this)}Copy the code

Let’s look at an example:

fun test(a) {
    lifecycleScope.launch {
        (1.3.).asFlow().take(2)
            .collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :1
D/carman: collect :2
Copy the code

takeWhileThe operator

The takeWhile operator is similar to filter, except that it interrupts subsequent operations when a condition is judged false.

fun test(a) {
    lifecycleScope.launch {
       flowOf(1.1.1.2.3.4.4.5.1.2.2.3.3).map {
           delay(100)
           it
       }.takeWhile {
            it  == 1
        }.collect { value ->
            Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :1
D/carman: collect :1
D/carman: collect :1
Copy the code

As you can see, although we have four 1’s in the set, we cancel the execution of the subsequent stream because we encounter a false judgment before the fourth 1.

dropThe operator

The drop operator, as opposed to take, executes subsequent streams after discarding a specified number of counts.

fun test(a) {
   lifecycleScope.launch {
       (1.3.).asFlow().drop(2)
           .collect { value ->
           Log.d(TAG, "collect :${value}")}}}Copy the code
D/carman: collect :3
Copy the code

End stream operator

Collect is the most basic terminal operator, and we use collect in almost every example. Let’s look at the other end operators.

toListThe operator

The toList operator tells us to replace our stream with a List collection

fun test(a) {
    lifecycleScope.launch {
       val list = (1.. 5).asFlow().toList()
        Log.d(TAG, "toList :${list}")}}Copy the code
D/carman: toList :[1.2.3.4.5]
Copy the code

toSetThe operator

The toSet operator is similar to the toList operator. But it’s a flow into a Set.

fun test(a) {
    lifecycleScope.launch {
       val list = (1.. 5).asFlow().toSet()
        Log.d(TAG, "toList :${list}")}}Copy the code
D/carman: toSet :[1.2.3.4.5]
Copy the code

firstThe operator

The first operator fetches the first element of the stream.

fun test(a) {
    lifecycleScope.launch {
       val first = (1.. 5).asFlow().first()
        Log.d(TAG, "first :${first}")}}Copy the code
D/carman: first :1
Copy the code

lastThe operator

The last operator, as opposed to the first operator, is the last element to fetch the stream.

fun test(a) {
    lifecycleScope.launch {
       val last = (1.. 5).asFlow().last()
        Log.d(TAG, "last :${last}")}}Copy the code
D/carman: last :5
Copy the code

singleThe operator

Terminal operators that wait and emit only one value. NoSuchElementException is thrown for empty streams and IllegalStateException is thrown for streams containing multiple elements.

fun test(a) {
    lifecycleScope.launch {
       val single = (1.. 5).asFlow().single()
        Log.d(TAG, "single :${single}")}}Copy the code
Process: com.example.myapplication, PID: 27922
java.lang.IllegalArgumentException: Flow has more than one element
Copy the code

Because we are firing multiple values, flow throws an IllegalArgumentException.

fun test(a) {
    lifecycleScope.launch {
       val single = flow{ emit(1) }.single()
        Log.d(TAG, "single :${single}")}}Copy the code
D/carman: single :1
Copy the code

Flow will perform correctly when we emit only one value using the single operator.

reduceThe operator

The reduce operator accumulates from the first element and applies the operation to the current accumulator value and to each element.

fun test(a) {
    lifecycleScope.launch {
       val reduce = (1.. 5).asFlow().reduce { a, b ->  a + b }
        Log.d(TAG, "reduce :${reduce}")}}Copy the code
D/carman: reduce :15
Copy the code

foldThe operator

The fold operator is the same as the reduce operator, except that it adds an initial value. It accumulates the value from the initial value and applies the operating current accumulator value to each element.

fun test(a) {
    lifecycleScope.launch {
       val fold = (1.. 5).asFlow().fold(10,{ a, b ->
           a+b
       })
        Log.d(TAG, "fold :${fold}")}}Copy the code
D/carman: fold :25
Copy the code

So far we have been very clear about the use of Flow and which operators to use under what circumstances. But we need to add one more thing. When we are executing the flow, it takes time to process the upstream and downstream of each launch, which will lead to a long processing time for our whole flow. How should we shorten the time?

FlowThe buffer

For example, while our upstream stream emits very slowly, producing an element every 100 milliseconds, the downstream collector is also very slow, taking 300 milliseconds to process the element. Let’s see how long it takes to collect three numbers from the stream:

fun test(a) {
    lifecycleScope.launch {
        val time = measureTimeMillis {
            flow {
                for (i in 1.3.) {
                    delay(100)
                    emit(i)
                }
            }.collect { value ->
                delay(300)
                Log.d(TAG, "collect :${value}")
            }
        }
        Log.d(TAG, "Collected in $time ms")}}Copy the code
D/carman: collect :1
D/carman: collect :2
D/carman: collect :3
D/carman: Collected in 1273 ms
Copy the code

It takes more than 1300 milliseconds for the entire collection process (personal devices vary) because of these three numbers, which take 400 milliseconds each. In this case, we need to use the buffer operator to compress the increment time.

fun test(a) {
    lifecycleScope.launch {
        val time = measureTimeMillis {
            flow {
                for (i in 1.3.) {
                    delay(100)
                    emit(i)
                }
            }.buffer().collect { value ->
                delay(300)
                Log.d(TAG, "collect :${value}")
            }
        }
        Log.d(TAG, "Collected in $time ms")}}Copy the code
D/carman: collect :1
D/carman: collect :2
D/carman: collect :3
D/carman: Collected in 1039 ms
Copy the code

Although the results are the same, they are faster when executed using the buffer operator. Because buffer creates the processing stream efficiently, it only takes 100 milliseconds for the first number to be generated and 300 milliseconds for each number to be processed. This took about 1000 milliseconds to run.

That concludes the basics of Flow. The next section covers the more advanced uses of Flow on Android, StateFlow and SharedFlow.

Originality is not easy. If you like this article, you can click “like”.

Related articles Kotlin coroutine fundamentals and Principles series

  • Basic usage of Kotlin coroutines
  • Kotlin coroutine introduction to Android version in detail (ii) -> Kotlin coroutine key knowledge points preliminary explanation
  • Kotlin coroutine exception handling
  • Use Kotlin coroutine to develop Android applications
  • Network request encapsulation for Kotlin coroutines
  • Kotlin coroutine theory (1)
  • Kotlin coroutine theory (2)
  • [Kotlin coroutine introduction for Android version in detail (8) -> In-depth Kotlin coroutine principle (3)]
  • [Kotlin coroutine introduction for Android (9) -> In-depth Kotlin coroutine principle (4)]

FlowA series of

  • Flow Usage of Kotlin Coroutines (1)

Extend the series

  • Encapsulating DataBinding saves you thousands of lines of code
  • A ViewModel wrapper for everyday use