3. Flow VS Sequences
Internally, each Flow executes sequentially, much like Sequences.
The difference between a Flow and a Sequences is that a Flow does not block the main thread while a Sequences block the main thread.
Using the flow:
fun main(a) = runBlocking {
launch {
for (j in 1.. 5) {
delay(100)
println("I'm not blocked $j")
}
}
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.collect { println(it) }
println("Done")}Copy the code
Execution Result:
1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5
Copy the code
The sequence of use:
fun main(a) = runBlocking {
launch {
for (k in 1.. 5) {
delay(100)
println("I'm blocked $k")
}
}
sequence {
for (i in 1.. 5) {
Thread.sleep(100)
yield(i)
}
}.forEach { println(it) }
println("Done")}Copy the code
Execution Result:
1
2
3
4
5
Done
I'm blocked 1
I'm blocked 2
I'm blocked 3
I'm blocked 4
I'm blocked 5
Copy the code
From this, it can be concluded that Flow does not block the main thread when using various suspend functions (collect, emit in this example).
Flow VS RxJava
The Kotlin coroutine library itself was designed with reference to RxJava, and the figure below shows how to migrate from RxJava to Kotlin coroutines. (Fire and ice figuratively represent Hot and Cold Stream.)
4.1 Cold Stream
The flow block will run only with a call to collected(), just as the RxJava-created Observables will run only with a call to SUBSCRIBE ().
4.2 Hot Stream
As shown above, you can implement Hot Stream with the help of the Kotlin Channel.
4.3. The Completion
When a Flow completes (normally or when an exception occurs), if an operation needs to be executed, it can be done in two ways: Imperative and Declarative.
4.3.1 imperative
By using try… The finally realize
fun main(a) = runBlocking {
try {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.collect { println(it) }
} finally {
println("Done")}}Copy the code
4.3.2 the declarative
Through the onCompletion() function
fun main(a) = runBlocking {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.onCompletion { println("Done") }
.collect { println(it) }
}
Copy the code
4.3.3 onCompleted (Implemented with extension functions)
An extension function implements rxJava-like onCompleted(), which is called only when it finishes normally:
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
Copy the code
It works like onCompletion()
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
fun main(a) = runBlocking {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.onCompleted { println("Completed...") }
.collect{println(it)}
}
Copy the code
However, onCompleted() is not executed if the Flow exception ends.
4.4 Backpressure
Backpressure is one of the features of responsive programming.
RxJava2 Flowable supports the following Backpressure policies:
- MISSING: The Flowable created does not specify a backpressure policy and does not cache or discard data sent via OnNext.
- ERROR: if asynchronous data in a buffer pool is put in the Flowable overrun, would be thrown MissingBackpressureException anomalies.
- BUFFER: Flowable asynchronous BUFFER pool, like the observables, no fixed size, can add unlimited data, don’t throw MissingBackpressureException exceptions, but will lead to OOM.
- DROP: If Flowable’s asynchronous cache pool is full, the data that will be put into the cache pool will be discarded.
- LATEST: If the cache pool is full, data that will be added to the cache pool will be discarded. This is the same as the DROP policy, except that the LATEST policy forces the last entry into the cache pool regardless of the state of the cache pool.
Backpressure of Flow is realized by suspend function.
4.4.1 Buffer () Corresponds to the buffer policy
fun currTime(a) = System.currentTimeMillis()
var start: Long = 0
fun main(a) = runBlocking {
val time = measureTimeMillis {
(1.. 5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.buffer()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")}Copy the code
Execution Result:
Emit 1 (104ms)
Collect 1 starts (108ms)
Emit 2 (207ms)
Emit 3 (309ms)
Emit 4 (411ms)
Emit 5 (513ms)
Collect 1 ends (613ms)
Collect 2 starts (613ms)
Collect 2 ends (1114ms)
Collect 3 starts (1114ms)
Collect 3 ends (1615ms)
Collect 4 starts (1615ms)
Collect 4 ends (2118ms)
Collect 5 starts (2118ms)
Collect 5 ends (2622ms)
Collected in 2689 ms
Copy the code
4.4.2 conflate() Corresponds to LATEST policy
fun main(a) = runBlocking {
val time = measureTimeMillis {
(1.. 5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.conflate()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")}Copy the code
Execution Result:
Emit 1 (106ms)
Collect 1 starts (110ms)
Emit 2 (213ms)
Emit 3 (314ms)
Emit 4 (419ms)
Emit 5 (520ms)
Collect 1 ends (613ms)
Collect 5 starts (613ms)
Collect 5 ends (1113ms)
Cost 1162 ms
Copy the code
4.4.3 DROP strategy
RxJava contributor: David Karnok, he wrote a kotlin – flow – extensions library, including: FlowOnBackpressureDrop. Kt, this class supports DROP strategy.
/** * Drops items from the upstream when the downstream is not ready to receive them. */
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop(a) : Flow<T> = FlowOnBackpressureDrop(this)
Copy the code
Using this library, you can support the DROP strategy by using Flow’s extension function onBackpressurureDrop().
Related articles in the series:
Kotlin Coroutines Flow series (1) Basic use of Flow
Kotlin Coroutines Flow series (3) exception handling
Kotlin Coroutines Flow series (4) thread operations
Kotlin Coroutines Flow series (5) Other operators