Flow other operators

8.1 the Transform operators

transform

When using the transform operator, emit can be called as many times as you want. This is the main difference between transform and Map:

fun main(a) = runBlocking {

    (1.. 5).asFlow()
        .transform {
            emit(it * 2)
            delay(100)
            emit(it * 4)
        }
        .collect { println(it) }
}
Copy the code

Transform can also emit arbitrary values using emit:

fun main(a) = runBlocking {

    (1.. 5).asFlow()
        .transform {
            emit(it * 2)
            delay(100)
            emit("emit $it")
        }
        .collect { println(it) }
}
Copy the code

8.2 the Size – limiting operators

take

The take operator takes only the first few emit values.

fun main(a) = runBlocking {

    (1.. 5).asFlow()
        .take(2)
        .collect { println(it) }
}
Copy the code

8.3 Terminal flow operators

At the end of Kotlin Coroutines Flow series (I) Basic Use of Flow, I sorted out the Terminal operators related to Flow. This article introduces the reduce and fold operators.

reduce

Similar to the Reduce function in the Kotlin set, it can perform computational operations on the set.

For example, the sum over a square sequence:

fun main(a) = runBlocking {

    val sum = (1.. 5).asFlow()
        .map { it * it }
        .reduce { a, b -> a + b }

    println(sum)
}
Copy the code

For example, calculate the factorial:

fun main(a) = runBlocking {

    val sum = (1.. 5).asFlow().reduce { a, b -> a * b }

    println(sum)
}
Copy the code

fold

Similar to the fold function in the Kotlin collection, fold also needs to be set to an initial value.

fun main(a) = runBlocking {

    val sum = (1.. 5).asFlow()
        .map { it * it }
        .fold(0) { a, b -> a + b }

    println(sum)
}
Copy the code

In the code above, an initial value of 0 is analogous to summing a square sequence using the Reduce function.

And to compute the factorial:

fun main(a) = runBlocking {

    val sum = (1.. 5).asFlow().fold(1) { a, b -> a * b }

    println(sum)
}
Copy the code

An initial value of 1 is similar to calculating the factorial using the reduce function.

8.4 Composing flows operators

zip

Zip is the operator that can merge two flows.

fun main(a) = runBlocking {

    val flowA = (1.. 5).asFlow()
    val flowB = flowOf("one"."two"."three"."four"."five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}
Copy the code

Execution Result:

1 and one
2 and two
3 and three
4 and four
5 and five
Copy the code

The zip operator merges an item from flowA with the corresponding item from flowB. Even if delay() is used for every item in the flowB, the merge process waits for delay() to complete.

fun main(a) = runBlocking {

    val flowA = (1.. 5).asFlow()
    val flowB = flowOf("one"."two"."three"."four"."five").onEach { delay(100)}val time = measureTimeMillis {
        flowA.zip(flowB) { a, b -> "$a and $b" }
            .collect { println(it) }
    }

    println("Cost $time ms")}Copy the code

Execution Result:

1 and one
2 and two
3 and three
4 and four
5 and five
Cost 561 ms
Copy the code

If the number of items in flowA is greater than that in flowB:

fun main(a) = runBlocking {

    val flowA = (1.6.).asFlow()
    val flowB = flowOf("one"."two"."three"."four"."five")
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}
Copy the code

Number of items of new flow after merge = number of items of smaller flow.

Execution Result:

1 and one
2 and two
3 and three
4 and four
5 and five
Copy the code

combine

Combine is also a merge, but not quite like Zip.

When using a Combine, each time a new item is emitted from flowA, it is merged with flowB’s latest item.

fun main(a) = runBlocking {

    val flowA = (1.. 5).asFlow().onEach { delay(100)}val flowB = flowOf("one"."two"."three"."four"."five").onEach { delay(200)  }
    flowA.combine(flowB) { a, b -> "$a and $b" }
        .collect { println(it) }
}
Copy the code

Execution Result:

1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five
Copy the code

flattenMerge

In fact, a flattenMerge does not combine multiple flows but executes them as a single flow.

fun main(a) = runBlocking {

    val flowA = (1.. 5).asFlow()
    val flowB = flowOf("one"."two"."three"."four"."five")

    flowOf(flowA,flowB)
        .flattenConcat()
        .collect{ println(it) }
}
Copy the code

Execution Result:

1
2
3
4
5
one
two
three
four
five
Copy the code

In order to see flowA and flowB executing as a single flow more clearly, change them slightly.

fun main(a) = runBlocking {

    val flowA = (1.. 5).asFlow().onEach { delay(100)}val flowB = flowOf("one"."two"."three"."four"."five").onEach { delay(200) }

    flowOf(flowA,flowB)
        .flattenMerge(2)
        .collect{ println(it) }
}
Copy the code

Execution Result:

1
one
2
3
two
4
5
three
four
five
Copy the code

8.5 Flattening flows operators

FlatMapConcat and flatMapMerge are similar to RxJava concatMap and flatMap operators.

flatMapConcat

FlatMapConcat is implemented by the map, flattenConcat operator.

@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) - >Flow<R>): Flow<R> =
    map(transform).flattenConcat()
Copy the code

After flatMapConcat is called, the collect function waits for the flow inside the flatMapConcat to complete before collecting new values.

fun currTime(a) = System.currentTimeMillis()

var start: Long = 0

fun main(a) = runBlocking {

    (1.. 5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapConcat {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")}}Copy the code

Execution Result:

1: First at 114 ms from start
1: Second at 619 ms from start
2: First at 719 ms from start
2: Second at 1224 ms from start
3: First at 1330 ms from start
3: Second at 1830 ms from start
4: First at 1932 ms from start
4: Second at 2433 ms from start
5: First at 2538 ms from start
5: Second at 3041 ms from start
Copy the code

flatMapMerge

FlatMapMerge is implemented by the map, flattenMerge operators.

@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) - >Flow<R>): Flow<R> = map(transform).flattenMerge(concurrency)
Copy the code

FlatMapMerge calls internal blocks of code sequentially and executes the collect function in parallel.

fun currTime(a) = System.currentTimeMillis()

var start: Long = 0

fun main(a) = runBlocking {

    (1.. 5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapMerge {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")}}Copy the code

Execution Result:

1: First at 116 ms from start
2: First at 216 ms from start
3: First at 319 ms from start
4: First at 422 ms from start
5: First at 525 ms from start
1: Second at 618 ms from start
2: Second at 719 ms from start
3: Second at 822 ms from start
4: Second at 924 ms from start
5: Second at 1030 ms from start
Copy the code

The flatMapMerge operator has a parameter concurrency, which uses DEFAULT_CONCURRENCY by default. You can modify this parameter if you want a more intuitive view of the parallelism of flatMapMerge. For example, if you change it to 2, you will see a different result.

flatMapLatest

When the new value is emitted, the previous flow is cancelled.

fun currTime(a) = System.currentTimeMillis()

var start: Long = 0

fun main(a) = runBlocking {

    (1.. 5).asFlow()
        .onStart { start = currTime() }
        .onEach { delay(100) }
        .flatMapLatest {
            flow {
                emit("$it: First")
                delay(500)
                emit("$it: Second")
            }
        }
        .collect {
            println("$it at ${System.currentTimeMillis() - start} ms from start")}}Copy the code

Execution Result:

1: First at 114 ms from start
2: First at 220 ms from start
3: First at 321 ms from start
4: First at 422 ms from start
5: First at 524 ms from start
5: Second at 1024 ms from start
Copy the code

Flow vs. Reactive Streams

Natural multi-platform support

Flow can also be used on multiple platforms due to the multi-platform support of the Kotlin language itself.

interoperability

Flow is still in the responsive category. The flow.aspublisher () and publiser.asflow () modules in Kotlinx-Coroutines-Reactive make it easy to interoperate flows with Reactive Streams.

Related articles in the series:

Kotlin Coroutines Flow series (1) Basic use of Flow

Kotlin Coroutines Flow series (2) Flow VS RxJava2

Kotlin Coroutines Flow series (3) exception handling

Kotlin Coroutines Flow series (4) thread operations