How do I represent multiple values
  • Suspended functions can asynchronously return a single value, but how do you asynchronously return multiple calculated values?
A scheme that returns multiple values asynchronously
  • A collection of
  • The sequence
  • Hang up function
  • Flow
/*suspend*/ fun simpleFlow(a) = flow {
    for(i in 1.3.){
        delay(1000)
        emit(i)
    }
}
Copy the code
Flow is different from other methods
  • A flow type builder function named flow
  • flow{… } code in the building block can be suspended
  • The function simpleFlow is no longer marked with the suspend modifier
  • Streams emit values using the emit function
  • The stream uses the collect function to collect values
Application Flow
  • On Android, file download is a very typical application of Flow

Cold flow
  • A Flow is something like a sequenceCold flowThe code in the Flow builder does not run until the flow is collected
fun simpleFlow(a) = flow {
    println("Flow started")
    for(i in 1.3.){
        delay(1000)
        emit(i)
    }
}

fun testFlowIsCode(a) = runBlocking {
    val flow = simpleFlow()
    println("Flow Collect")
    flow.collect { println(it) }
    println("Flow Collect again")
    flow.collect { println(it) }
}
Copy the code

Returns the result

Flow Collect
Flow started
1
2
3
Flow Collect again
Flow started
1
2
3

Process finished with exit code 0
Copy the code

Val flow = simpleFlow(); val flow = simpleFlow() } the code in the building block, which is executed only when collect is called, is called cold flow

Continuity of flow
  • Each individual collection of a stream is performed sequentially, unless a special operator is used
  • Each emitted value is processed by each transition operator upstream and downstream, and then passed to the end operator
fun testFlowContinuation(a) = runBlocking {
    (1.. 5).asFlow().filter {
        it%2= =0
    }.map {
        "string $it"
    }.collect {
        println(it)
    }
}
Copy the code
Flow builder
  • The flowOf builder defines a flow that emits a fixed set of values
  • Use the.asflow () extension function to convert various collections and sequences into streams
fun testFlowOf(a) = runBlocking {
    flowOf("one"."two"."three")
        .onEach {
            delay(1000)// Emit an element every 1s
        }.collect{
            println(it)
        }
}
Copy the code
Flow context
  • The collection of flows always takes place in the context of the invocation of the coroutine, and this property of flows is called context saving
  • flow{… } Code in the builder must save properties contextually and is not allowed to emit from other contexts
fun simpleFlow() = flow { println1("Flow started ${Thread.currentThread().name}") for(i in 1.. 3) { delay(1000) emit(i) } } fun testFlowContext() = runBlocking { simpleFlow().collect { value -> println1("$value ${Thread.currentThread().name}") } }Copy the code

If both methods exist on the main thread, flow{… } change the thread as follows

fun simpleFlow(a) = flow {
   withContext(Dispatchers.IO){
       println1("Flow started ${Thread.currentThread().name}")
       for(i in 1.3.) {
           delay(1000)
           emit(i)
       }
   }
}
Copy the code

Execute it, and an error will be reported

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@599ad64a, BlockingEventLoop@2cbb850d],
		but emission happened in [DispatchedCoroutine{Active}@42689e0c, Dispatchers.IO].
		Please refer to 'flow' documentation or use 'flowOn' instead
Copy the code

This proves that attributes must be saved context-wise and are not allowed to be emitted from other contexts

  • FlowOn operator, which is used to change the context in which the flow is emitted
fun simpleFlow(a) = flow {
    println1("Flow started ${Thread.currentThread().name}")
    for(i in 1.3.) {
        delay(1000)
        emit(i)
    }
}.flowOn(Dispatchers.IO)
Copy the code
Start the flow
  • Using launchIn instead of Collect we can start the collection of streams in a separate coroutine
/ / the event source
fun event(a) = (1.3.).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)

fun testEventLaunch(a) = runBlocking {
    event().onEach {event->
        println1("Event: $event  ${Thread.currentThread().name}")
    }.launchIn(CoroutineScope(Dispatchers.IO)).join()
}
Copy the code
The cancellation of the flow
  • The flow uses the same collaboration cancellation as the coroutine. As usual, the collection of streams can be cancelled when a stream is suspended in a cancelable suspend function, such as delay
fun simpleFlow1(a) = flow {
    println1("Flow started ${Thread.currentThread().name}")
    for(i in 1.3.) {
        delay(1000)
        emit(i)
        println1("Emitting $i")
    }
}.flowOn(Dispatchers.IO)

fun testCancelFlow(a) = runBlocking {
    withTimeoutOrNull(2500) {
        simpleFlow1().collect { value -> println1(value) }
    }
    println1("Done")}Copy the code
Unmonitor the flow
  • For convenience, the flow builder performs additional ensureActive monitoring on each emitted value for cancellation, which means that from the flow{… } Issuing busy loops can be cancelled
  • For performance reasons, most other flow operations do not perform other cancel monitoring on their own, and in cases where the coroutine is in a busy cycle, it must be clear whether the monitoring is canceled
  • This is done through the cancellable operator
fun testCancelFlowCheck(a) = runBlocking {
    (1.. 5).asFlow().cancellable().collect {
        println1(it)
        if(it == 3)cancel()
    }
}
Copy the code
Back pressure

The pressure the water receives in the same direction as the flow is called back pressure or the efficiency of the producer is greater than that of the consumer

  • Buffer (), the code that runs the emission element in the stream concurrently, lengthening the pipe and increasing the buffer
  • Conflate (), merges emission items without processing each value
  • CollectLates (), cancel and relaunch the last value
  • The flowOn operator uses the same caching mechanism when the CoroutineDispatcher must be changed, but the buffer function explicitly requests buffering without changing the execution context
fun simpleFlow2(a) = flow {
    for(i in 1.3.) {
        delay(100)
        emit(i)
        println1("Emitting $i")}}fun testFlowBackPressure(a) = runBlocking {
    val time = measureTimeMillis {
        simpleFlow2()
// .flowOn(Dispatchers.Default)
// .buffer(50)
// .conflate()
// .collectLatest {
/ /}
            .collect {
            delay(300)
            println1("$it")
        }
    }
    println1(time)
}
Copy the code
The transition flow operator
  • Operators can be used to transform streams, just as collections and sequences are
  • The transition operator applies to the upstream flow and returns to the downstream flow
  • These operators are also cold operators, just like streams. Such operators are not themselves suspend functions
  • It runs fast and returns the definition of the new transformation flow
suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun testTransformFlowOperator(a) = runBlocking {
    (1.. 5).asFlow().transform { request ->
        emit("Making request $request")
        emit(performRequest(request))
    }.collect {
        println1(it)
    }
}
Copy the code
The length limiting operator

take

fun number() = flow {
    try {
        emit(1)
        emit(2)
        emit(3)
    } finally {
        println1("Finally in numbers")
    }
}

fun testLimitLengthOperator() = runBlocking {
    number().take(2).collect { println1(it) }
}
Copy the code
Terminal operator
  • End operators are suspended functions used on a stream to start a stream collection. Collect is the most basic end operator, but there are other handy end operators
  1. To various sets, such as toList and toSet
  2. Operators that get the first value and ensure that the stream emits a single value
  3. Reduce and fold are used to shrink streams to a single value
fun testTerminalOperator(a) = runBlocking {
    val sum = (1.. 5).asFlow().map { it * it }.reduce { a, b ->
        a+b
    }
    println1(sum)
}
Copy the code
fun testTerminalOperator(a) = runBlocking {
    val sum = (1.. 5).asFlow()
    .fold(0, {acc, i -> acc + i })// Take the sum from 1 to 5, starting with 0
    println1(sum)
}
Copy the code
Combine multiple streams
  • Like the sequence. zip extension function in the Kotlin library, streams have a ZIP operator for combining related values in the two streams
fun testZip(a) = runBlocking {
    val numbers = (1.. 5).asFlow()
    val strs = flowOf("one"."two"."three")
    numbers.zip(strs) { a, b ->
        "$a -> $b"
    }.collect { println1(it) }
}
Copy the code
Exhibition advection
  • Streams represent a sequence of values that are received asynchronously, so it’s easy to run into situations where each value triggers a request for another sequence of values. However, the asynchronous nature of streams requires different flattening methods, for which there are a number of stream flattening operators
  1. FlatmapConcat Connection mode
  2. FlatMapMerge Merge mode
  3. FlatMapLatest latest flatting mode
fun requestFlow(i: Int) = flow {
    emit("$i first")
    delay(500)
    emit("$i second")}fun testFlatMapConcat(a) = runBlocking {
    (1.3.).asFlow().onEach { delay(100)}//.map {requestFlow(it)
        .flatMapConcat { requestFlow(it) }
// .flatMapConcat { requestFlow(it) }
// .flatMapConcat { requestFlow(it) }
        .collect { println1(it) }
}
Copy the code

Return result comparison

FlatMapConcat mode 1 First 1 second 2 First 2 Second 3 First 3 Second flatMapMerge mode 1 First 2 first 3 first 1 second 2 FlatMapLatest mode 1 First 2 First 3 First 3 SecondCopy the code
Exception handling of streams
  • There are several ways to handle exceptions when they are thrown by emitters or code in an operator
  1. The try/catch block
  2. Catch the function
fun number() = flow { try { emit(1) emit(2) emit(3) } finally { println1("Finally in numbers") } }.catch { e : Emit (4)}Copy the code
The completion of flow
  • When the stream collection is complete (normal or abnormal), it may need to perform an action.
  1. Imperative finally block
  2. OnCompletion declarative processing
fun simpleFlow4(a) = (1.3.).asFlow()

fun testFlowCompleteInFinally(a) = runBlocking {
// try {
// simpleFlow4().collect { println1(it) }
// } finally {
// println1("Done")
/ /}
    simpleFlow4().onCompletion {exception-> //onCompletion also prints out upstream and downstream exception information
        println1("Done ${exception ? :""}")
    }.collect {
        println1(it)
    }
}
Copy the code