This is more challenging day 18, I participate in activities for details see: more challenge the official flow address: www.kotlincn.net/docs/refere…
A flow that
Simple use of streams
suspend fun flow1() { flow<Int> { (0.. 4). ForEach {emit(it)// producer sends data}}. Collect {println(it)// consumer processes data}}Copy the code
In this case flow {… } code in the building block can be suspended
The flow is cold, so the method body in the flow will not be called until collect is called
Flow operator
This article to tell the operator: flowwOf, asFlow, map, the transform, take, toList, toSet, first, the reduce, buffer, collectLast, zip, combine,
Flow builder
flowof
Variable length parameters in flowOf can be transmitted one by one
flowOf(1, 2, 5, 4).collect {
println(it)
}
Copy the code
asFlow
FlowOf can transform a set into a flow emission
Suspend fun asFlowM(){listOf(1,2,9,0,8).asflow ().collect{println(it)}}Copy the code
The transition flow operator
The transition operator can transform the flow, that is, take the data transmitted by the producer and make certain changes and then forward it to the consumer
map
We can perform some transition operations in the map, such as in this case, sending data *9 from the producer to the consumer
suspend fun mapM(){ (1.. 9).asFlow().map { it*9 }.collect{ println(it) } }Copy the code
It is worth mentioning that we can do asynchronous operations in the map, such as the following code:
Flow <Int> {var userId = login() userId}.map {var Permission = getPermission(it) permission}.collect{println(" Print permission $it") }Copy the code
In this code, we simulate a situation where the login method is called asynchronously in flow to obtain the user ID, and then getPermission is called asynchronously in map to obtain the user permission. The user permission information is then returned to the consumer.
This approach is usually useful when you have multiple interfaces in your code that need to be called consecutively, and can be very effective in avoiding the nesting of interface calls
Transform operator transform
Transform focuses on converting types
(1.. 3).asflow () // a request flow // the generic <Int,String> in transform means that after converting Int to String, .transform<Int, String> { request -> emit("transform Int to String $request") } .collect { response -> println(response) }Copy the code
The data sent in this example is of Int type, but the data we need in collect is of String type, so we can convert Int type into String in transform, and then continue to emit using emit
The length limiting operator take
The take operator limits the amount of data we want to consume, as shown in the code
(1.. 9).asFlow().take(3).collect { println(it) }Copy the code
In this case, our producer sends 1.. Nine, there are nine numbers, but because of the take(3) operator, only the first three emitted numbers can be consumed by the consumer
End stream operator
The end operator, as I understand it, is the method called by the consumer, such as Collect
toList
The data is consumed into a List
suspend fun toList():List<Int> { return (1.. 9).asFlow().filter { it % 2 == 0 }.toList() }Copy the code
toSet
With toList
frist
Gets the first element
suspend fun firstM(): Int { return (2.. 9).asFlow().filter { it % 2 == 1 }.first() }Copy the code
reduce
The Reduce Lambuda expression provides the calculation formula.
In the Reduce lambuda expression, the current value to be consumed and the previously calculated value can be evaluated, and the new value is returned. The final value is returned after all value consumption is complete
suspend fun reduceM():Int { return (1.. 9).asFlow().reduce { accumulator, value -> println("$accumulator : $value") accumulator + value } }Copy the code
The flow is continuous
Flow context
The buffer
The official demo understands why buffering occurs
According to the figure, consumers need to wait when the producer produces data, and then process the data after the producer sends the data. During this period, the producer must wait for the data to be processed before continuing to transmit data
This is mutual blocking, so is there a way that producers can continue to generate objects while consumers consume data
buffer
Buffers can cache producer data without being blocked by consumers
suspend fun bufferM() { val startMillis = System.currentTimeMillis() flow<Int> { (1.. 3).forEach { delay(300) emit(it) } }.buffer(4) .collect { delay(400) println(it) Println (" ${system.currentTimemillis () -startMillis}")}}Copy the code
Code execution print log:
1 time has passed 745, 2 time has passed 1148, 3 time has passed 1552Copy the code
If we did not use buffer, the total duration should be 2100ms
The total buffer length used is: 1552=300+400*3
So when using buffer, producers can send data concurrently without being blocked by consumers
Combine multiple streams
conflate
When the producer sends data faster than the consumer, the consumer only gets the latest data sent by the producer
suspend fun conflate(){ flow<Int> { (1.. 9).forEach { delay(100) emit(it) } }.conflate().collect { delay(300) println(it) } }Copy the code
For example, the above code, because of conflate, produces the following output:
1
3
6
9
Copy the code
If no conflate exists output is as follows:
1, 2, 3, 4, 5, 6, 7, 8, 9Copy the code
By contrast, it is clear that the conflate example ignores a lot of data that cannot be processed in real time
collectLast
If producer data is sent and the consumer has not finished processing the previous data, it will stop processing the previous data and proceed to the latest data
suspend fun collectLastM(){ flow<Int> { (1.. 9).forEach { delay(100) emit(it) } }.collectLatest { delay(800) println(it) } }Copy the code
For example, the output of this example is as follows:
zip
The zip operator can merge two streams into one stream, and then the zip method will process and combine the data sent by the two streams and continue to send the data to the consumer. If the two streams have different lengths, the shorter stream will be processed:
- Both streams have the same length, 3
suspend fun zipM(){ val flow1 = (1.. 3). AsFlow () val flow2 = flowOf (" li bai ", "du fu", "zhuo in peace in peace") flow1. Zip (flow2) {a, b - > "$a: $b}. Collect {println (it)}}Copy the code
Output:
1: Li Bai 2: Du Fu 3: An an ZhuoCopy the code
Let’s change the above code to change the flow1 length to 5
val flow1 = (1.. 5).asFlow()Copy the code
View the output:
1: Li Bai 2: Du Fu 3: An an ZhuoCopy the code
So to verify our initial conclusion, when two streams of different lengths are zip merged, the length of data output by the consumer is the length of the shorter stream
combine
In the previous section, the disadvantage of zip is that if the two streams are of unequal length, the later part of the longer stream cannot be output
Combine is designed to address zip’s weakness (hardly a weakness, just in a different context, you can call it a weakness)
suspend fun combineM(){ val flowA = (1.. $a: $b}. Collect {println(it)}} 5).asflow () val flowB = flowOf(a,b-> "$a: $b"}Copy the code
Output log:
1: Li Bai 2: Li Bai 2: Du Fu 3: Du Fu 3: An an zhuo 4: an an zhuo 5: an an zhuoCopy the code
Our two streams, number stream of length 5 and string stream of length 3.
Simple logic analysis of the effect:
Flow2 emits "Li Bai", print: 1: Li Bai flow emits 2, Flow2 does not emit data, print: 2: Li Bai flow does not emit "Du Fu", 2: Du Fu flow emits 3, flow2 does not emit, print: 3: li Bai flow emits "Du Fu", 2: Du Fu flow emits 3, flow2 does not emit. Du Fu flow is not launched, flow2 is launched "ananzhuo", print: 3: Ananzhuo flow is launched 4, Flow2 is launched complete, print: 4: Ananzhuo flow is launched 5, Flow2 is launched complete, print: 5: AnanzhuoCopy the code
Exhibition advection
The next three flows, I won’t write them yet, because I haven’t thought of application scenarios
flatMapConcat
flatMapMerge
flagMapLatest
Abnormal flow
Wrap the stream with try/catch
It is possible to use try/catch to collect stream exceptions, but this approach is not recommended
Use the catch operator for flow to process a flow
Using the catch operator of flow to handle exceptions is more elegant
Catch has a downside, however. It can only catch exceptions from producers, not consumers
suspend fun trycatch() { flow<Int> { (1.. ForEach {if (it == 2) {// Throw a NullPointerException(" } emit(it)}.catch {e-> e.println (it)}.collect{println(it)}.collect{println(it)}.collect{println(it)}.collect{println(it)}.collect{println(it)}Copy the code
How are consumer exceptions handled
In the last video, we learned about catch producer exceptions, so how to handle exceptions generated by consumers.
Try throwing an exception in the consumer to see if it can be caught
flow<Int> { for (i in 1.. 3) {emit(I)}}. Catch {emit(-1)}. Collect {if(it==2){// Throw data in the consumer throw IllegalArgumentException(" data is invalid ")} println(it) }Copy the code
Output:
1 Exception in thread "main" java.lang.IllegalArgumentException: At HahaKt$consumerCatch$$inlined$1.emit(collect.kt :138)Copy the code
Put the exception code in onEach to catch the exception
suspend fun consumerCatch() { flow<Int> { for (i in 1.. 3) {emit(I)}}. OnEach {if (it == 2) { Throw IllegalArgumentException(" invalid ")}}. Catch {emit(-1)}. Collect {println(it)}}Copy the code
Output:
1-1Copy the code
Complete onCompletion flow
Use onCompletion to send a value once the stream is finished
FlowOf (1, 23, 5, 3, 4). OnCompletion {println(" complete ") emit(12344)}. Collect {println(it)}Copy the code
Output:
123 5 34 Liu operation completed 12344Copy the code