Flow other operators
8.1 the Transform operators
transform
When using the transform operator, emit can be called as many times as you want. This is the main difference between transform and Map:
fun main(a) = runBlocking {
(1.. 5).asFlow()
.transform {
emit(it * 2)
delay(100)
emit(it * 4)
}
.collect { println(it) }
}
Copy the code
Transform can also emit arbitrary values using emit:
fun main(a) = runBlocking {
(1.. 5).asFlow()
.transform {
emit(it * 2)
delay(100)
emit("emit $it")
}
.collect { println(it) }
}
Copy the code
8.2 the Size – limiting operators
take
The take operator takes only the first few emit values.
fun main(a) = runBlocking {
(1.. 5).asFlow()
.take(2)
.collect { println(it) }
}
Copy the code
8.3 Terminal flow operators
At the end of Kotlin Coroutines Flow series (I) Basic Use of Flow, I sorted out the Terminal operators related to Flow. This article introduces the reduce and fold operators.
reduce
Similar to the Reduce function in the Kotlin set, it can perform computational operations on the set.
For example, the sum over a square sequence:
fun main(a) = runBlocking {
val sum = (1.. 5).asFlow()
.map { it * it }
.reduce { a, b -> a + b }
println(sum)
}
Copy the code
For example, calculate the factorial:
fun main(a) = runBlocking {
val sum = (1.. 5).asFlow().reduce { a, b -> a * b }
println(sum)
}
Copy the code
fold
Similar to the fold function in the Kotlin collection, fold also needs to be set to an initial value.
fun main(a) = runBlocking {
val sum = (1.. 5).asFlow()
.map { it * it }
.fold(0) { a, b -> a + b }
println(sum)
}
Copy the code
In the code above, an initial value of 0 is analogous to summing a square sequence using the Reduce function.
And to compute the factorial:
fun main(a) = runBlocking {
val sum = (1.. 5).asFlow().fold(1) { a, b -> a * b }
println(sum)
}
Copy the code
An initial value of 1 is similar to calculating the factorial using the reduce function.
8.4 Composing flows operators
zip
Zip is the operator that can merge two flows.
fun main(a) = runBlocking {
val flowA = (1.. 5).asFlow()
val flowB = flowOf("one"."two"."three"."four"."five")
flowA.zip(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
Copy the code
Execution Result:
1 and one
2 and two
3 and three
4 and four
5 and five
Copy the code
The zip operator merges an item from flowA with the corresponding item from flowB. Even if delay() is used for every item in the flowB, the merge process waits for delay() to complete.
fun main(a) = runBlocking {
val flowA = (1.. 5).asFlow()
val flowB = flowOf("one"."two"."three"."four"."five").onEach { delay(100)}val time = measureTimeMillis {
flowA.zip(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
println("Cost $time ms")}Copy the code
Execution Result:
1 and one
2 and two
3 and three
4 and four
5 and five
Cost 561 ms
Copy the code
If the number of items in flowA is greater than that in flowB:
fun main(a) = runBlocking {
val flowA = (1.6.).asFlow()
val flowB = flowOf("one"."two"."three"."four"."five")
flowA.zip(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
Copy the code
Number of items of new flow after merge = number of items of smaller flow.
Execution Result:
1 and one
2 and two
3 and three
4 and four
5 and five
Copy the code
combine
Combine is also a merge, but not quite like Zip.
When using a Combine, each time a new item is emitted from flowA, it is merged with flowB’s latest item.
fun main(a) = runBlocking {
val flowA = (1.. 5).asFlow().onEach { delay(100)}val flowB = flowOf("one"."two"."three"."four"."five").onEach { delay(200) }
flowA.combine(flowB) { a, b -> "$a and $b" }
.collect { println(it) }
}
Copy the code
Execution Result:
1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five
Copy the code
flattenMerge
In fact, a flattenMerge does not combine multiple flows but executes them as a single flow.
fun main(a) = runBlocking {
val flowA = (1.. 5).asFlow()
val flowB = flowOf("one"."two"."three"."four"."five")
flowOf(flowA,flowB)
.flattenConcat()
.collect{ println(it) }
}
Copy the code
Execution Result:
1
2
3
4
5
one
two
three
four
five
Copy the code
In order to see flowA and flowB executing as a single flow more clearly, change them slightly.
fun main(a) = runBlocking {
val flowA = (1.. 5).asFlow().onEach { delay(100)}val flowB = flowOf("one"."two"."three"."four"."five").onEach { delay(200) }
flowOf(flowA,flowB)
.flattenMerge(2)
.collect{ println(it) }
}
Copy the code
Execution Result:
1
one
2
3
two
4
5
three
four
five
Copy the code
8.5 Flattening flows operators
FlatMapConcat and flatMapMerge are similar to RxJava concatMap and flatMap operators.
flatMapConcat
FlatMapConcat is implemented by the map, flattenConcat operator.
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) - >Flow<R>): Flow<R> =
map(transform).flattenConcat()
Copy the code
After flatMapConcat is called, the collect function waits for the flow inside the flatMapConcat to complete before collecting new values.
fun currTime(a) = System.currentTimeMillis()
var start: Long = 0
fun main(a) = runBlocking {
(1.. 5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapConcat {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")}}Copy the code
Execution Result:
1: First at 114 ms from start
1: Second at 619 ms from start
2: First at 719 ms from start
2: Second at 1224 ms from start
3: First at 1330 ms from start
3: Second at 1830 ms from start
4: First at 1932 ms from start
4: Second at 2433 ms from start
5: First at 2538 ms from start
5: Second at 3041 ms from start
Copy the code
flatMapMerge
FlatMapMerge is implemented by the map, flattenMerge operators.
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) - >Flow<R>): Flow<R> = map(transform).flattenMerge(concurrency)
Copy the code
FlatMapMerge calls internal blocks of code sequentially and executes the collect function in parallel.
fun currTime(a) = System.currentTimeMillis()
var start: Long = 0
fun main(a) = runBlocking {
(1.. 5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapMerge {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")}}Copy the code
Execution Result:
1: First at 116 ms from start
2: First at 216 ms from start
3: First at 319 ms from start
4: First at 422 ms from start
5: First at 525 ms from start
1: Second at 618 ms from start
2: Second at 719 ms from start
3: Second at 822 ms from start
4: Second at 924 ms from start
5: Second at 1030 ms from start
Copy the code
The flatMapMerge operator has a parameter concurrency, which uses DEFAULT_CONCURRENCY by default. You can modify this parameter if you want a more intuitive view of the parallelism of flatMapMerge. For example, if you change it to 2, you will see a different result.
flatMapLatest
When the new value is emitted, the previous flow is cancelled.
fun currTime(a) = System.currentTimeMillis()
var start: Long = 0
fun main(a) = runBlocking {
(1.. 5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapLatest {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")}}Copy the code
Execution Result:
1: First at 114 ms from start
2: First at 220 ms from start
3: First at 321 ms from start
4: First at 422 ms from start
5: First at 524 ms from start
5: Second at 1024 ms from start
Copy the code
Flow vs. Reactive Streams
Natural multi-platform support
Flow can also be used on multiple platforms due to the multi-platform support of the Kotlin language itself.
interoperability
Flow is still in the responsive category. The flow.aspublisher () and publiser.asflow () modules in Kotlinx-Coroutines-Reactive make it easy to interoperate flows with Reactive Streams.
Related articles in the series:
Kotlin Coroutines Flow series (1) Basic use of Flow
Kotlin Coroutines Flow series (2) Flow VS RxJava2
Kotlin Coroutines Flow series (3) exception handling
Kotlin Coroutines Flow series (4) thread operations