Public number: byte array, keen to share Android system source code parsing, Jetpack source code parsing, popular open source library source code parsing and other essential knowledge interview

Recently, I have been learning about kotlin coroutines, and the best learning materials are naturally the official learning documents. After reading them, I have the idea of translating the official documents. Before and after spent close to a month time, a total of nine articles, here also share out, hope to help readers. Limited by personal knowledge, some translation is not too smooth, also hope that readers can put forward their opinions

Coroutines official documentation: Coroutines – Guide

Coroutines-cn-guide coroutines-cn-guide

Coroutine official document Chinese translator: leavesC

A suspended function can return a single value asynchronously, but how do you return multiple asynchronously computed values? That’s where kotlin Flows come in

Representing multiple values

You can use collections to represent multiple values in Kotlin. For example, there is a function foo() that returns a List of three numbers and prints them using forEach

fun foo(a): List<Int> = listOf(1.2.3)
 
fun main(a) {
    foo().forEach { value -> println(value) } 
}
Copy the code

Output results:

1
2
3
Copy the code

1.1 Sequences

If we use some CPU-consuming blocking code (100 milliseconds per calculation) to compute the number, we can use a Sequence to represent the number:

fun foo(a): Sequence<Int> = sequence {
    // sequence builder
    for (i in 1.3.) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value}}fun main(a) {
    foo().forEach { value -> println(value) }
}
Copy the code

This code outputs the same list of numbers, but waits 100 milliseconds before printing each number

1.2Suspending Functions

The computation of the code in the previous section blocks the main thread where the code is running. When these values are evaluated by asynchronous code, we can mark function foo with the suspend modifier so that it can perform its work without blocking and return the result as a list

import kotlinx.coroutines.*                 
                           
//sampleStart
suspend fun foo(a): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1.2.3)}fun main(a) = runBlocking<Unit> {
    foo().forEach { value -> println(value) } 
}
//sampleEnd
Copy the code

This code waits one second and outputs the number

1.3, Flows

Using List< Int > as the return value type means that we can only return all values at once. To represent an asynchronously computed stream of values, we can use the Flow< Int > type, just like the Sequence< Int > type for synchronously computed values

//sampleStart
fun foo(a): Flow<Int> = flow { // flow builder
    for (i in 1.3.) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value}}fun main(a) = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1.3.) {
            println("I'm not blocked $k")
            delay(100)}}// Collect the flow
    foo().collect { value -> println(value) }
}
//sampleEnd
Copy the code

This code waits 100 milliseconds before printing each number, but does not block the main thread. This can be verified by printing “I’m not blocked” every 100 milliseconds from a separate coroutine running from the main thread:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
Copy the code

Note that the code differs from the Flow in the previous example in the following ways:

  • The constructor function of type Flow is named Flow
  • flow{… } can be suspended
  • The foo function no longer marks the suspend modifier
  • Values are emitted from the stream via the emit function
  • Use the collect function to obtain the value from flow

Instead of flow{… }, you can see that the main thread is blocked in this case

The Flows are cold.

Flows are cold Streams, similar to sequences, and the code in the Flow Builder does not run until it starts collecting stream values. You can see this clearly in the following example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart      
fun foo(a): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1.3.) {
        delay(100)
        emit(i)
    }
}

fun main(a) = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
//sampleEnd
Copy the code

Running result:

Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
Copy the code

This is a key reason why the function foo(), which returns flow, is unmarked with the suspend modifier. Foo () itself returns quickly without any waiting. Flow starts with each collection, which is why we see “Flow started” when we call Collect again

A) Flow Cancellation b) Cancellation

Flow uses the same cancellation of collaboration as coroutines. However, the Flow implementation base does not introduce additional cancellation points; it is completely transparent about cancellation operations. In general, collection operations on a stream can be cancelled when the stream is suspended in an uncallable suspend function such as delay, otherwise it cannot be cancelled

The following example shows how a flow in a withTimeoutOrNull block can be cancelled and execution stopped upon timeout

//sampleStart
fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        delay(100)
        println("Emitting $i")
        emit(i)
    }
}

fun main(a) = runBlocking<Unit> {
    withTimeoutOrNull(250) {
        // Timeout after 250ms
        foo().collect { value -> println(value) }
    }
    println("Done")}//sampleEnd
Copy the code

Note that the Flow in the foo() function outputs only two numbers, resulting in the following output:

Emitting 1
1
Emitting 2
2
Done
Copy the code

Accordingly, you can comment out the delay function in the flow and increase the scope of the for loop, and you can see that the flow is not cancelled because there are no additional hang points introduced into the flow

//sampleStart
fun foo(a): Flow<Int> = flow {
    for (i in 1..Int.MAX_VALUE) {
// delay(100)
        println("Emitting $i")
        emit(i)
    }
}

fun main(a) = runBlocking<Unit> {
    withTimeoutOrNull(250) {
        // Timeout after 250ms
        foo().collect { value -> println(value) }
    }
    println("Done")}//sampleEnd
Copy the code

Iv. Flow Builders

The flow{… } is the most basic stream builder, but there are other builders that make it easier to declare streams:

  • FlowOf () defines a flow builder that issues a fixed set of values
  • Extension functions can be used.asFlow()Convert various collections and sequences to streams

Thus, an example of printing numbers from 1 to 3 from a stream can be rewritten as:

fun main(a) = runBlocking<Unit> {
    //sampleStart
    // Convert an integer range to a flow
    (1.3.).asFlow().collect { value -> println(value) }
    //sampleEnd
}   
Copy the code

Intermediate Flow Operators

Operators can be used to transform streams, just as collections and sequences are used. The intermediate operator is applied to the upstream stream and returns to the downstream stream. These operators are cold operators, just like streams. The operator itself is not a suspended function. It works quickly and returns a new converted stream, but the reference only contains the operational definition of the new stream and does not immediately convert

The underlying operators have familiar names, such as Map and Filter. The important difference between a stream operator and a sequence is that the code in the stream operator can call a suspended function

For example, you can use the Map operator to map an incoming request flow to the resulting value, even if the execution of the request is a long-running operation implemented by a suspended function:

//sampleStart
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main(a) = runBlocking<Unit> {(1.3.).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
//sampleEnd
Copy the code

The result is three lines, one line per second

response 1
response 2
response 3
Copy the code

5.1 Transform Operator

Among the transformation operators for a stream, the most common one is called transform. It can be used to simulate simple data transformations, such as Maps and Filters, as well as to implement more complex transformations. Using the transform operator, we can emit any value any number of times

For example, by using transform, we can issue a string before a long-running asynchronous request, followed by a response:

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main(a) = runBlocking<Unit> {
    //sampleStart
    (1.3.).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }
    //sampleEnd
}
Copy the code

The output value:

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Copy the code

5.2. Size operators

Length limiting intermediate operators cancel the execution of a flow when the corresponding limit is reached. Cancellation in coroutines is always done by throwing an exception, so that all resource management functions (such as try {… } finally { … }) can be executed normally on cancellation

//sampleStart
fun numbers(a): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)}finally {
        println("Finally in numbers")}}fun main(a) = runBlocking<Unit> {
    numbers()
        .take(2) // take only the first two
        .collect { value -> println(value) }
}
//sampleEnd
Copy the code

The output of this code clearly shows flow{… } the body of the function stops after issuing the second number:

1
2
Finally in numbers
Copy the code

Terminal Flow Operators

The terminal flow operator is a suspended function used to start a stream. Collect is the most basic terminal flow operator, but there are other terminal operators that can make operations even easier:

  • To various sets, such as toList and toSet functions
  • The first operator is used to get the first value, and the single operator is used to ensure that the stream emits a single value
  • Use reduce and fold to restore the stream to a value

Such as:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(a) = runBlocking<Unit> {
//sampleStart         
    val sum = (1.. 5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5                           
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)
//sampleEnd     
}
Copy the code

Output a single value:

55
Copy the code

Flows are sequential

Individual collections of each stream are executed sequentially unless special operators are used to operate on multiple streams. Collections work directly in coroutines that call terminal operators, and no new coroutines are launched by default. Each issued value is processed upstream and downstream by all intermediate operators and later passed to the terminal operator

See the following example, which filters even numbers and maps them to strings:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(a) = runBlocking<Unit> {
//sampleStart         
    (1.. 5).asFlow()
        .filter {
            println("Filter $it")
            it % 2= =0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")}//sampleEnd                  
}
Copy the code

Output:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Copy the code

8. Flow Context

Collection of a stream is always performed in the context of a call to a coroutine. For example, if a Foo stream exists, the following code will execute in the context specified by the developer, regardless of the implementation details of the Foo stream:

withContext(context) {
    foo.collect { value ->
        println(value) // run in the specified context }}Copy the code

This feature of the flow is called context retention

So, by default, flow{… } runs in the context provided by the collector for the corresponding stream. For example, look at the implementation of Foo, which prints out the calling thread and emits three numbers:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

//sampleStart
fun foo(a): Flow<Int> = flow {
    log("Started foo flow")
    for (i in 1.3.) {
        emit(i)
    }
}

fun main(a) = runBlocking<Unit> {
    foo().collect { value -> log("Collected $value")}}//sampleEnd
Copy the code

Running result:

[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
Copy the code

Since foo().collect is called on the main thread, the foo stream is also called on the main thread. This is a perfect default setting for quick-return or asynchronous code that does not care about the execution context and does not block the caller

8.1 Wrong emission withContext

However, CPU-consuming code that may need to run for a long time in the context of Dispatchers may need to perform default code and UI updates in the context of Dispatchers. Typically, withContext is used to change the context in the code when kotlin coroutines are used, but fow{… The code in} must comply with the top and bottom text retention property and is not allowed to trigger from another context

Try running the following code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
                      
//sampleStart
fun foo(a): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1.3.) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value}}}fun main(a) = runBlocking<Unit> {
    foo().collect { value -> println(value) } 
}            
//sampleEnd
Copy the code

The code generates the following exception:

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at ...
Copy the code

8.2. FlowOn Operator

With one exception, the flowOn function can be used to change the context in which the flow is sent. The correct way to change the flow context is shown in the following example, which also prints the names of the corresponding threads to show how all threads work:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
//sampleStart
fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main(a) = runBlocking<Unit> {
    foo().collect { value ->
        log("Collected $value")}}//sampleEnd
Copy the code

Note that the flow {… } works in the background thread and takes values in the main thread

Another thing to note here is that the flowOn operator changes the default ordering nature of the flow. Now the value operation occurs on the coroutine “coroutine#1”, while the value firing operation runs simultaneously on the coroutine “coroutine#2” in another thread. When the CoroutineDispatcher must be changed in the context of an upstream flow, the flowOn operator creates another coroutine for that upstream flow

9. Buffering

From the perspective of the total time required to collect the flow, it can be helpful to run different parts of the flow in different coroutines, especially when it comes to long-running asynchronous operations. For example, suppose the foo() stream is slow to emit and takes 100 milliseconds to generate elements; The collector is also slow, taking 300 milliseconds to process elements. Let’s see how long it takes to collect such a stream with three numbers:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

//sampleStart
fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value}}fun main(a) = runBlocking<Unit> { 
    val time = measureTimeMillis {
        foo().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")}//sampleEnd
Copy the code

The above code produces a result similar to the following, with the entire collection process taking approximately 1200 milliseconds (three numbers, 400 milliseconds each)

1
2
3
Collected in 1220 ms
Copy the code

We can use the buffer operator on the stream and run foo() ‘s value code at the same time as the fetch code, instead of running them sequentially

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value}}fun main(a) = runBlocking<Unit> { 
//sampleStart
    val time = measureTimeMillis {
        foo()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}
Copy the code

This gives you the same output but runs much faster because we’ve effectively created a processing pipeline that waits only 100 milliseconds for the first number and then only takes 300 milliseconds to process each number. Running like this takes about 1000 milliseconds:

1
2
3
Collected in 1071 ms
Copy the code

Note that the flowOn operator uses the same buffering mechanism when the CoroutineDispatcher must be changed, but here we are explicitly requesting the buffering without changing the execution context

9.1 Conflation

When a stream is used to represent the partial results of an operation or an operation status update, you may not need to process each value, but only the most recent value. In this case, the merge operator can be used to skip the intermediate value when the valuer is too slow to process it. Let’s revise the previous example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value}}fun main(a) = runBlocking<Unit> { 
//sampleStart
    val time = measureTimeMillis {
        foo()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}
Copy the code

As you can see, although the first digit is still being processed, the second and third digits are already generated, so the second digit is merged (discarded) and only the most recent digit (the third) is delivered to the valuer:

1
3
Collected in 758 ms
Copy the code

9.2 Processing the latest value

Merging is a way to speed up processing when both the transmitter and the processor are slow. It does this by discarding the emitted value. Another approach is to cancel the slow collector and restart it every time a new value is emitted. There are a series of xxxLatest operators that perform the same basic logic as the XXX operator, but cancel the execution of the code in its block when a new value is generated. In the previous example, we tried to change conflate to collectLatest:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value}}fun main(a) = runBlocking<Unit> { 
//sampleStart
    val time = measureTimeMillis {
        foo()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value") 
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value") 
            } 
    }   
    println("Collected in $time ms")
//sampleEnd
}
Copy the code

Since the collectLatest body is delayed 300 milliseconds and emits a new value every 100 milliseconds, we can see that the collectLatest block gets each emitted value, but only completes the last value:

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
Copy the code

Composing Multiple Flows

There are many ways to combine multiple streams

10.1, zip,

Like the sequence.zip extension function in the Kotlin library, a stream has a ZIP operator that combines the corresponding values of two streams:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(a) = runBlocking<Unit> { 
//sampleStart                                                                           
    val nums = (1.3.).asFlow() // numbers 1.. 3
    val strs = flowOf("one"."two"."three") // strings 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
//sampleEnd
}
Copy the code

Running result:

1 -> one
2 -> two
3 -> three
Copy the code

10.2, Combine

When flow represents the latest value of a variable or operation (see the relevant section on Conflation), it may be necessary to perform calculations that depend on the latest value of the corresponding flow and recalculate it when any upstream flow emits a value. The corresponding family of operators is called combine

For example, if the numbers in the example above are updated every 300 milliseconds, but the strings are updated every 400 milliseconds, then compressing them with the ZIP operator will still produce the same result, although the results are printed every 400 milliseconds

In this case, we use the intermediate operator onEach to defer each element and make the code that emits the sample flow more declarative and shorter

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(a) = runBlocking<Unit> { 
//sampleStart                                                                           
    val nums = (1.3.).asFlow().onEach { delay(300)}// numbers 1.. 3 every 300 ms
    val strs = flowOf("one"."two"."three").onEach { delay(400)}// strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")}//sampleEnd
}
Copy the code

However, if the combine operator is used here instead of zip:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(a) = runBlocking<Unit> { 
//sampleStart                                                                           
    val nums = (1.3.).asFlow().onEach { delay(300)}// numbers 1.. 3 every 300 ms
    val strs = flowOf("one"."two"."three").onEach { delay(400)}// strings every 400 ms          
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")}//sampleEnd
}
Copy the code

We get a completely different output:

1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
Copy the code

Eleven, Flattening flows

A stream represents a sequence of values received asynchronously, so it is easy to get new values in cases where each value triggers a request for another sequence of values. For example, we can use the following function that returns two string streams 500 milliseconds apart:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")}Copy the code

Now, if we have a stream that contains three integers and call requestFlow for each integer, look like this:

(1.3.).asFlow().map { requestFlow(it) }
Copy the code

Then we end up with a flow (flow< flow< String >>) that needs to be flattened into a single flow for further processing. Sets and sequences provide the flatten and flatMap operators for this. However, because of the asynchronous nature of streams, they require different spread patterns, so there is a series of flattening operators on the stream

11.1, flatMapConcat

The flatMapConcat and flattenflattenCAT operators implement a Concatenating pattern, and are the most direct analogies to sequence operators. They wait for the internal flow to complete and then start collecting the next flow, as shown in the following example:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")}fun main(a) = runBlocking<Unit> {
    //sampleStart
    val startTime = System.currentTimeMillis() // remember the start time
    (1.3.).asFlow().onEach { delay(100)}// a number every 100 ms
        .flatMapConcat { requestFlow(it) }
        .collect { value ->
            // collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")}//sampleEnd
}
Copy the code

The sequential nature of flatMapConcat is clearly visible in the output:

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
Copy the code

11.2, flatMapMerge

Another flattening pattern collects all incoming streams simultaneously and merges their values into a single stream in order to send values as quickly as possible. This is implemented by the flatMapMerge and flattener merge operators. They both accept an optional concurrency parameter that limits the number of concurrent streams that can be collected simultaneously (default equals DEFAULT_CONCURRENCY)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")}fun main(a) = runBlocking<Unit> { 
//sampleStart
    val startTime = System.currentTimeMillis() // remember the start time 
    (1.3.).asFlow().onEach { delay(100)}// a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")}//sampleEnd
}
Copy the code

The concurrency of the flatMapMerge is obvious:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
Copy the code

Note that flatMapMerge calls its code block in sequence ({requestFlow(it)}), but at the same time collects a result stream, which is the same as executing the sequence Map {requestFlow(it)} first and then calling a flattener merge on the return value

11.3, flatMapLatest

Similar to the collectLatest operator described in the section “Processing the Latest Value,” a “latest” flattening pattern exists. In this mode, once a new stream is emitted, the previously emitted stream is cancelled. This is done through the flatMapLatest operator

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")}fun main(a) = runBlocking<Unit> { 
//sampleStart
    val startTime = System.currentTimeMillis() // remember the start time 
    (1.3.).asFlow().onEach { delay(100)}// a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")}//sampleEnd
}
Copy the code

The output in this example is a good example of how flatMapLatest works

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
Copy the code

Note that flatMapLatest will cancel all the code in its block ({requestFlow(it)}) when the new value arrives. RequestFlow itself is called quickly, it’s not a pending function, and it can’t be canceled if it doesn’t contain additional pending points inside, so the delay function is used internally to allow it to be canceled

12. Flow Exceptions

When code inside the emitter or operator throws an exception, the stream collector can terminate, but with an exception. There are several ways to handle these exceptions

Collector try and Catch

The collector can use Kotlin’s try/catch block to handle exceptions

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        println("Emitting $i")
        emit(i) // emit next value}}fun main(a) = runBlocking<Unit> {
    try {
        foo().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value"}}}catch (e: Throwable) {
        println("Caught $e")}}//sampleEnd
Copy the code

This code successfully catches the exception in the collect operator and, as we can see, does not emit any more values after that:

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
Copy the code

Everything is caught

The previous example actually caught any exceptions that occurred in the emitter or any of the intermediate or terminal operators. For example, let’s change the code to map the emitted value to a string, but the corresponding code generates an exception:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(a): Flow<String> = 
    flow {
        for (i in 1.3.) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main(a) = runBlocking<Unit> {
    try {
        foo().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")}}//sampleEnd
Copy the code

Still catch this exception and stop collection:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
Copy the code

13. Exception Transparency

But how does the emitter’s code encapsulate its exception-handling behavior?

Flows must be transparent to exceptions, and the flow{… } when it is possible to throw an exception from a emitted value in the constructor, the exception must be explicitly thrown from within the try/catch block. This ensures that the collector that threw the exception can always catch the exception using a try/catch, as shown in the previous example

The emitter can use the catch operator to keep this exception transparent and allow encapsulation of its exception handling behavior. The catch operator can analyze an exception and react to it in different ways depending on the exception caught:

  • An exception can be rethrown using a throw
  • An exception can be converted to a value of emission using the catch emit
  • Exceptions can be ignored, logged, or handled by other code

For example, let’s emit text when we catch an exception:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(a): Flow<String> = 
    flow {
        for (i in 1.3.) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main(a) = runBlocking<Unit> {
//sampleStart
    foo()
        .catch { e -> emit("Caught $e")}// emit on exception
        .collect { value -> println(value) }
//sampleEnd
}            
Copy the code

The output of the sample code is the same as before, even though we no longer use try/catch around the code

13.1 Transparent Catch

Catch intermediate operators follow exception transparency and only catch upstream exceptions (that is, exceptions for all operators on the catch, not all operators under the catch). If collect {… } (placed below catch) throws an exception, and the program exits:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        println("Emitting $i")
        emit(i)
    }
}

fun main(a) = runBlocking<Unit> {
    foo()
        .catch { e -> println("Caught $e")}// does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}            
//sampleEnd
Copy the code

“Caught…” is not printed, despite the catch operator. The log

13.2. Catching Declaratively

We can combine the declarativeness of the catch operator with the desire to handle all exceptions by moving what the collect operator originally did into onEach and putting it before the catch operator. The value operation on this stream must be triggered by a call to the collect() function with no arguments:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(a): Flow<Int> = flow {
    for (i in 1.3.) {
        println("Emitting $i")
        emit(i)
    }
}

fun main(a) = runBlocking<Unit> {
//sampleStart
    foo()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
        .catch { e -> println("Caught $e") }
        .collect()
//sampleEnd
}            
Copy the code

Now we can see that a “Caught…” is printed. Message, so we catch all exceptions without explicitly using a try/catch

Iv. Flow Completion

When the flow collection is complete (normal or exception), it may need to perform an operation. As you may have noticed, it can be done in two ways: imperative or declarative

14.1. Imperative Finally Block

In addition to try/catch, the collector can also use finally to perform operations when the collection is complete

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(a): Flow<Int> = (1.3.).asFlow()

fun main(a) = runBlocking<Unit> {
    try {
        foo().collect { value -> println(value) }
    } finally {
        println("Done")}}//sampleEnd
Copy the code

This code prints the three numbers generated by the fon() stream, followed by the “Done” string

1
2
3
Done
Copy the code

14.2. Declarative Handling

For declarative methods, flow has an intermediate onCompletion operator, which is called after the flow has been fully collected

The previous example can be overridden using the onCompletion operator and generates the same output:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(a): Flow<Int> = (1.3.).asFlow()

fun main(a) = runBlocking<Unit> {
//sampleStart
    foo()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
//sampleEnd
}            
Copy the code

The main advantage of onCompletion is that it contains a lambda parameter that contains a nullable Throwable parameter that can be used to determine whether the flow collection completed normally or abnormally. In the following example, the foo() stream throws an exception after issuing the number 1:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(a): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main(a) = runBlocking<Unit> {
    foo()
        .onCompletion { cause -> if(cause ! =null) println("Flow completed exceptionally")}.catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}            
//sampleEnd
Copy the code

As you might expect, it will print:

1
Flow completed exceptionally
Caught exception
Copy the code

Unlike the catch operator, the onCompletion operator does not handle exceptions. As we can see from the sample code above, exceptions still flow downstream. It will be passed to the other onCompletion operators and can be processed using the catch operator

Upstream exceptions only

Just like the catch operator, onCompletion only sees exceptions from upstream, not downstream. For example, run the following code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(a): Flow<Int> = (1.3.).asFlow()

fun main(a) = runBlocking<Unit> {
    foo()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}
//sampleEnd
Copy the code

We can see that the completion cause is empty, but the stream fails to collect and throws an exception:

1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2
Copy the code

Declarative versus Declarative

Now we know how to collect the flow and handle its completion and exceptions in both imperative and declarative ways. The natural question here is, which approach should I choose? Why is that? As a library, we do not advocate any particular approach and believe that both approaches are valid and should be chosen according to your own preferences and code style

Launching Flow (Launching Flow)

It is easy to use streams to represent asynchronous events from a data source. In this case, we need a simulated addEventListener function that registers a piece of code as a response to an incoming event and continues to work further. The onEach operator can fill this role. However, onEach is an intermediate operator. We also need a terminal operator to collect the data. Otherwise, only registering onEach is ineffective

If we use the collect terminal operator after onEach, the code after collect will wait until the stream is collected:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
// Imitate a flow of events
fun events(a): Flow<Int> = (1.3.).asFlow().onEach { delay(100)}fun main(a) = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")}//sampleEnd
Copy the code

As you can see, it will print

Event: 1
Event: 2
Event: 3
Done
Copy the code

The launchIn terminal operator is useful here. By replacing collect with launchIn, we can start collecting stream data in a separate coroutine to immediately proceed with the next step of code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Imitate a flow of events
fun events(a): Flow<Int> = (1.3.).asFlow().onEach { delay(100)}//sampleStart
fun main(a) = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")}//sampleEnd
Copy the code

Running result:

Done
Event: 1
Event: 2
Event: 3
Copy the code

The parameters required by launchIn are used to specify the scope in which to launch the coroutine used to collect streams. In the example above, this scope comes from runBlocking, so while the stream is running, the runBlocking scope waits for its subcoroutine to complete and prevents the main function from returning and terminating the sample code

In a real application, scope would come from entities whose lifecycle is limited. Once the life cycle of this entity ends, the corresponding scope is cancelled, thus canceling the collection of the corresponding flow. onEach { … }.launchin (scope) works like addEventListener. However, the corresponding removeEventListener function is not required, because Both Cancellation and structured concurrency do the job

Note that launchIn also returns a Job object that can only be used to cancel the corresponding stream data collection coroutine, not cancel the entire scope or join it

Flow and Reactive Streams

For those who are familiar with Reactive Streams or reactive frameworks such as RxJava and project Reactor, design of the Flow may look very familiar.

Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in Reactive Streams and Kotlin Flows article.

While being different, conceptually, Flow is a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. Such converters are provided by kotlinx.coroutines out-of-the-box and can be found in corresponding reactive modules (kotlinx-coroutines-reactive for Reactive Streams, kotlinx-coroutines-reactor for Project Reactor and kotlinx-coroutines-rx2 for RxJava2). Integration modules include conversions from and to Flow, integration with Reactor’s Context and suspension-friendly ways to work with various reactive entities.