This is more challenging day 18, I participate in activities for details see: more challenge the official flow address: www.kotlincn.net/docs/refere…

A flow that

Simple use of streams

suspend fun flow1() { flow<Int> { (0.. 4). ForEach {emit(it)// producer sends data}}. Collect {println(it)// consumer processes data}}Copy the code

In this case flow {… } code in the building block can be suspended

The flow is cold, so the method body in the flow will not be called until collect is called

Flow operator

This article to tell the operator: flowwOf, asFlow, map, the transform, take, toList, toSet, first, the reduce, buffer, collectLast, zip, combine,

Flow builder

flowof

Variable length parameters in flowOf can be transmitted one by one

flowOf(1, 2, 5, 4).collect {
        println(it)
    }
Copy the code
asFlow

FlowOf can transform a set into a flow emission

Suspend fun asFlowM(){listOf(1,2,9,0,8).asflow ().collect{println(it)}}Copy the code

The transition flow operator

The transition operator can transform the flow, that is, take the data transmitted by the producer and make certain changes and then forward it to the consumer

map

We can perform some transition operations in the map, such as in this case, sending data *9 from the producer to the consumer

suspend fun mapM(){ (1.. 9).asFlow().map { it*9 }.collect{ println(it) } }Copy the code

It is worth mentioning that we can do asynchronous operations in the map, such as the following code:

Flow <Int> {var userId = login() userId}.map {var Permission = getPermission(it) permission}.collect{println(" Print permission $it") }Copy the code

In this code, we simulate a situation where the login method is called asynchronously in flow to obtain the user ID, and then getPermission is called asynchronously in map to obtain the user permission. The user permission information is then returned to the consumer.

This approach is usually useful when you have multiple interfaces in your code that need to be called consecutively, and can be very effective in avoiding the nesting of interface calls

Transform operator transform

Transform focuses on converting types

(1.. 3).asflow () // a request flow // the generic <Int,String> in transform means that after converting Int to String, .transform<Int, String> { request -> emit("transform Int to String $request") } .collect { response -> println(response) }Copy the code

The data sent in this example is of Int type, but the data we need in collect is of String type, so we can convert Int type into String in transform, and then continue to emit using emit

The length limiting operator take

The take operator limits the amount of data we want to consume, as shown in the code

(1.. 9).asFlow().take(3).collect { println(it) }Copy the code

In this case, our producer sends 1.. Nine, there are nine numbers, but because of the take(3) operator, only the first three emitted numbers can be consumed by the consumer

End stream operator

The end operator, as I understand it, is the method called by the consumer, such as Collect

toList

The data is consumed into a List

suspend fun toList():List<Int> { return (1.. 9).asFlow().filter { it % 2 == 0 }.toList() }Copy the code
toSet

With toList

frist

Gets the first element

suspend fun firstM(): Int { return (2.. 9).asFlow().filter { it % 2 == 1 }.first() }Copy the code
reduce

The Reduce Lambuda expression provides the calculation formula.

In the Reduce lambuda expression, the current value to be consumed and the previously calculated value can be evaluated, and the new value is returned. The final value is returned after all value consumption is complete

suspend fun reduceM():Int { return (1.. 9).asFlow().reduce { accumulator, value -> println("$accumulator : $value") accumulator + value } }Copy the code

The flow is continuous

Flow context

The buffer

The official demo understands why buffering occurs

According to the figure, consumers need to wait when the producer produces data, and then process the data after the producer sends the data. During this period, the producer must wait for the data to be processed before continuing to transmit data

This is mutual blocking, so is there a way that producers can continue to generate objects while consumers consume data

buffer

Buffers can cache producer data without being blocked by consumers

suspend fun bufferM() { val startMillis = System.currentTimeMillis() flow<Int> { (1.. 3).forEach { delay(300) emit(it) } }.buffer(4) .collect { delay(400) println(it) Println (" ${system.currentTimemillis () -startMillis}")}}Copy the code

Code execution print log:

1 time has passed 745, 2 time has passed 1148, 3 time has passed 1552Copy the code

If we did not use buffer, the total duration should be 2100ms

The total buffer length used is: 1552=300+400*3

So when using buffer, producers can send data concurrently without being blocked by consumers

Combine multiple streams

conflate

When the producer sends data faster than the consumer, the consumer only gets the latest data sent by the producer

suspend fun conflate(){ flow<Int> { (1.. 9).forEach { delay(100) emit(it) } }.conflate().collect { delay(300) println(it) } }Copy the code

For example, the above code, because of conflate, produces the following output:

1
3
6
9
Copy the code

If no conflate exists output is as follows:

1, 2, 3, 4, 5, 6, 7, 8, 9Copy the code

By contrast, it is clear that the conflate example ignores a lot of data that cannot be processed in real time

collectLast

If producer data is sent and the consumer has not finished processing the previous data, it will stop processing the previous data and proceed to the latest data

suspend fun collectLastM(){ flow<Int> { (1.. 9).forEach { delay(100) emit(it) } }.collectLatest { delay(800) println(it) } }Copy the code

For example, the output of this example is as follows:

zip

The zip operator can merge two streams into one stream, and then the zip method will process and combine the data sent by the two streams and continue to send the data to the consumer. If the two streams have different lengths, the shorter stream will be processed:

  1. Both streams have the same length, 3
suspend fun zipM(){ val flow1 = (1.. 3). AsFlow () val flow2 = flowOf (" li bai ", "du fu", "zhuo in peace in peace") flow1. Zip (flow2) {a, b - > "$a: $b}. Collect {println (it)}}Copy the code

Output:

1: Li Bai 2: Du Fu 3: An an ZhuoCopy the code

Let’s change the above code to change the flow1 length to 5

val flow1 = (1.. 5).asFlow()Copy the code

View the output:

1: Li Bai 2: Du Fu 3: An an ZhuoCopy the code

So to verify our initial conclusion, when two streams of different lengths are zip merged, the length of data output by the consumer is the length of the shorter stream

combine

In the previous section, the disadvantage of zip is that if the two streams are of unequal length, the later part of the longer stream cannot be output

Combine is designed to address zip’s weakness (hardly a weakness, just in a different context, you can call it a weakness)

suspend fun combineM(){ val flowA = (1.. $a: $b}. Collect {println(it)}} 5).asflow () val flowB = flowOf(a,b-> "$a: $b"}Copy the code

Output log:

1: Li Bai 2: Li Bai 2: Du Fu 3: Du Fu 3: An an zhuo 4: an an zhuo 5: an an zhuoCopy the code

Our two streams, number stream of length 5 and string stream of length 3.

Simple logic analysis of the effect:

Flow2 emits "Li Bai", print: 1: Li Bai flow emits 2, Flow2 does not emit data, print: 2: Li Bai flow does not emit "Du Fu", 2: Du Fu flow emits 3, flow2 does not emit, print: 3: li Bai flow emits "Du Fu", 2: Du Fu flow emits 3, flow2 does not emit. Du Fu flow is not launched, flow2 is launched "ananzhuo", print: 3: Ananzhuo flow is launched 4, Flow2 is launched complete, print: 4: Ananzhuo flow is launched 5, Flow2 is launched complete, print: 5: AnanzhuoCopy the code

Exhibition advection

The next three flows, I won’t write them yet, because I haven’t thought of application scenarios

flatMapConcat
flatMapMerge
flagMapLatest

Abnormal flow

Wrap the stream with try/catch

It is possible to use try/catch to collect stream exceptions, but this approach is not recommended

Use the catch operator for flow to process a flow

Using the catch operator of flow to handle exceptions is more elegant

Catch has a downside, however. It can only catch exceptions from producers, not consumers

suspend fun trycatch() { flow<Int> { (1.. ForEach {if (it == 2) {// Throw a NullPointerException(" } emit(it)}.catch {e-> e.println (it)}.collect{println(it)}.collect{println(it)}.collect{println(it)}.collect{println(it)}.collect{println(it)}Copy the code

How are consumer exceptions handled

In the last video, we learned about catch producer exceptions, so how to handle exceptions generated by consumers.

Try throwing an exception in the consumer to see if it can be caught
flow<Int> { for (i in 1.. 3) {emit(I)}}. Catch {emit(-1)}. Collect {if(it==2){// Throw data in the consumer throw IllegalArgumentException(" data is invalid ")} println(it) }Copy the code

Output:

1 Exception in thread "main" java.lang.IllegalArgumentException: At HahaKt$consumerCatch$$inlined$1.emit(collect.kt :138)Copy the code
Put the exception code in onEach to catch the exception
suspend fun consumerCatch() { flow<Int> { for (i in 1.. 3) {emit(I)}}. OnEach {if (it == 2) { Throw IllegalArgumentException(" invalid ")}}. Catch {emit(-1)}. Collect {println(it)}}Copy the code

Output:

1-1Copy the code

Complete onCompletion flow

Use onCompletion to send a value once the stream is finished

FlowOf (1, 23, 5, 3, 4). OnCompletion {println(" complete ") emit(12344)}. Collect {println(it)}Copy the code

Output:

123 5 34 Liu operation completed 12344Copy the code