3. Flow VS Sequences

Internally, each Flow executes sequentially, much like Sequences.

The difference between a Flow and a Sequences is that a Flow does not block the main thread while a Sequences block the main thread.

Using the flow:

fun main(a) = runBlocking {

    launch {
        for (j in 1.. 5) {
            delay(100)
            println("I'm not blocked $j")
        }
    }

    flow {
        for (i in 1.. 5) {
            delay(100)
            emit(i)
        }
    }.collect { println(it) }

    println("Done")}Copy the code

Execution Result:

1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5
Copy the code

The sequence of use:

fun main(a) = runBlocking {

    launch {
        for (k in 1.. 5) {
            delay(100)
            println("I'm blocked $k")
        }
    }

    sequence {
        for (i in 1.. 5) {
            Thread.sleep(100)
            yield(i)
        }
    }.forEach { println(it) }

    println("Done")}Copy the code

Execution Result:

1
2
3
4
5
Done
I'm blocked 1
I'm blocked 2
I'm blocked 3
I'm blocked 4
I'm blocked 5
Copy the code

From this, it can be concluded that Flow does not block the main thread when using various suspend functions (collect, emit in this example).

Flow VS RxJava

The Kotlin coroutine library itself was designed with reference to RxJava, and the figure below shows how to migrate from RxJava to Kotlin coroutines. (Fire and ice figuratively represent Hot and Cold Stream.)

4.1 Cold Stream

The flow block will run only with a call to collected(), just as the RxJava-created Observables will run only with a call to SUBSCRIBE ().

4.2 Hot Stream

As shown above, you can implement Hot Stream with the help of the Kotlin Channel.

4.3. The Completion

When a Flow completes (normally or when an exception occurs), if an operation needs to be executed, it can be done in two ways: Imperative and Declarative.

4.3.1 imperative

By using try… The finally realize

fun main(a) = runBlocking {
    try {
        flow {
            for (i in 1.. 5) {
                delay(100)
                emit(i)
            }
        }.collect { println(it) }
    } finally {
        println("Done")}}Copy the code

4.3.2 the declarative

Through the onCompletion() function

fun main(a) = runBlocking {
    flow {
        for (i in 1.. 5) {
            delay(100)
            emit(i)
        }
    }.onCompletion { println("Done") }
        .collect { println(it) }
}
Copy the code

4.3.3 onCompleted (Implemented with extension functions)

An extension function implements rxJava-like onCompleted(), which is called only when it finishes normally:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {

    collect { value -> emit(value) }

    action()
}
Copy the code

It works like onCompletion()

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {

    collect { value -> emit(value) }

    action()
}

fun main(a) = runBlocking {
    flow {
        for (i in 1.. 5) {
            delay(100)
            emit(i)
        }
    }.onCompleted { println("Completed...") }
        .collect{println(it)}
}
Copy the code

However, onCompleted() is not executed if the Flow exception ends.

4.4 Backpressure

Backpressure is one of the features of responsive programming.

RxJava2 Flowable supports the following Backpressure policies:

  • MISSING: The Flowable created does not specify a backpressure policy and does not cache or discard data sent via OnNext.
  • ERROR: if asynchronous data in a buffer pool is put in the Flowable overrun, would be thrown MissingBackpressureException anomalies.
  • BUFFER: Flowable asynchronous BUFFER pool, like the observables, no fixed size, can add unlimited data, don’t throw MissingBackpressureException exceptions, but will lead to OOM.
  • DROP: If Flowable’s asynchronous cache pool is full, the data that will be put into the cache pool will be discarded.
  • LATEST: If the cache pool is full, data that will be added to the cache pool will be discarded. This is the same as the DROP policy, except that the LATEST policy forces the last entry into the cache pool regardless of the state of the cache pool.

Backpressure of Flow is realized by suspend function.

4.4.1 Buffer () Corresponds to the buffer policy

fun currTime(a) = System.currentTimeMillis()

var start: Long = 0

fun main(a) = runBlocking {

    val time = measureTimeMillis {
        (1.. 5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .buffer()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }

    println("Cost $time ms")}Copy the code

Execution Result:

Emit 1 (104ms) 
Collect 1 starts (108ms) 
Emit 2 (207ms) 
Emit 3 (309ms) 
Emit 4 (411ms) 
Emit 5 (513ms) 
Collect 1 ends (613ms) 
Collect 2 starts (613ms) 
Collect 2 ends (1114ms) 
Collect 3 starts (1114ms) 
Collect 3 ends (1615ms) 
Collect 4 starts (1615ms) 
Collect 4 ends (2118ms) 
Collect 5 starts (2118ms) 
Collect 5 ends (2622ms) 
Collected in 2689 ms
Copy the code

4.4.2 conflate() Corresponds to LATEST policy

fun main(a) = runBlocking {

    val time = measureTimeMillis {
        (1.. 5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .conflate()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }

    println("Cost $time ms")}Copy the code

Execution Result:

Emit 1 (106ms) 
Collect 1 starts (110ms) 
Emit 2 (213ms) 
Emit 3 (314ms) 
Emit 4 (419ms) 
Emit 5 (520ms) 
Collect 1 ends (613ms) 
Collect 5 starts (613ms) 
Collect 5 ends (1113ms) 
Cost 1162 ms
Copy the code

4.4.3 DROP strategy

RxJava contributor: David Karnok, he wrote a kotlin – flow – extensions library, including: FlowOnBackpressureDrop. Kt, this class supports DROP strategy.

/** * Drops items from the upstream when the downstream is not ready to receive them. */
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop(a) : Flow<T> = FlowOnBackpressureDrop(this)
Copy the code

Using this library, you can support the DROP strategy by using Flow’s extension function onBackpressurureDrop().

Related articles in the series:

Kotlin Coroutines Flow series (1) Basic use of Flow

Kotlin Coroutines Flow series (3) exception handling

Kotlin Coroutines Flow series (4) thread operations

Kotlin Coroutines Flow series (5) Other operators