Introduction to Kotlin Flow
The Flow library was added after the release of Kotlin Coroutines 1.3.2.
The official document gives a simple one-sentence introduction:
Flow — Cold asynchronous Stream with Flow Builder and comprehensive operator set (Filter, map, etc);
Flow from the documentation, it is similar to an RxJava Observable. Observable can be Cold or Hot.
Ii. Basic use of Flow
Flow can return multiple asynchronously computed values, such as the following Flow Builder:
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.collect{
println(it)
}
Copy the code
The Flow interface has only one collect function
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
Copy the code
If you are familiar with RxJava, you can understand that collect() corresponds to subscribe() and emit() corresponds to onNext().
2.1 create a flow
In addition to the flow Builder just shown, there are several other ways to create flows:
flowOf()
flowOf(1.2.3.4.5)
.onEach {
delay(100)
}
.collect{
println(it)
}
Copy the code
asFlow()
listOf(1.2.3.4.5).asFlow()
.onEach {
delay(100)
}.collect {
println(it)
}
Copy the code
channelFlow()
channelFlow {
for (i in 1.. 5) {
delay(100)
send(i)
}
}.collect{
println(it)
}
Copy the code
The final channelFlow Builder and flow Builder is a certain difference.
Flow is Cold Stream. In the absence of switching threads, the producer and consumer are synchronous and non-blocking. Channel is Hot Stream. ChannelFlow implements the producer and consumer asynchronous non-blocking model.
The following code, which shows how to use flow Builder, takes approximately 1 second:
fun main(a) = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.collect{
delay(100)
println(it)
}
}
print("cost $time")}Copy the code
In the case of channelFlow Builder, it takes approximately 700 milliseconds:
fun main(a) = runBlocking {
val time = measureTimeMillis{
channelFlow {
for (i in 1.. 5) {
delay(100)
send(i)
}
}.collect{
delay(100)
println(it)
}
}
print("cost $time")}Copy the code
Of course, flow takes about 700 milliseconds to switch between threads, which is similar to channelFlow Builder.
fun main(a) = runBlocking {
val time = measureTimeMillis{
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.IO)
.collect {
delay(100)
println(it)
}
}
print("cost $time")}Copy the code
2.2 Switching Threads
Compared with RxJava, which requires observeOn and subscribeOn to switch threads, Flow is simpler. Just use flowOn. The following example shows that both the Flow Builder and map operators are affected by flowOn.
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println(it)
}
Copy the code
When collect() specifies which thread, you need to see which CoroutineScope the entire flow is under.
For example, the following code collect() is in the main thread:
fun main(a) = runBlocking {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")}}Copy the code
Execution result:
main: 1
main: 4
main: 9
main: 16
main: 25
Copy the code
It’s important to note that you don’t use withContext() to switch flow threads.
2.3 the flow to cancel
If flow is suspended within a suspended function, then flow can be cancelled, otherwise it cannot be cancelled.
fun main(a) = runBlocking {
withTimeoutOrNull(2500) {
flow {
for (i in 1.. 5) {
delay(1000)
emit(i)
}
}.collect {
println(it)
}
}
println("Done")}Copy the code
Execution result:
1
2
Done
Copy the code
2.4 Terminal flow operators
The Flow API is somewhat similar to the Java Stream API. It also has Intermediate Operations and Terminal Operations.
The Terminal operator of Flow can be the suspend function, such as collect, single, Reduce, toList, etc. It can also be the launchIn operator to use flow within a specified CoroutineScope.
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
Copy the code
Clean up the Flow Terminal operator
- collect
- single/first
- toList/toSet/toCollection
- count
- fold/reduce
- launchIn/produceIn/broadcastIn
Related articles in this series:
Kotlin Coroutines Flow Series (ii) Flow VS RxJava2
Kotlin Coroutines Flow Series (3) Exception handling
Kotlin Coroutines Flow series (4) Thread operations
Kotlin Coroutines Flow series (5) Other operators