preface
Originally, this paper was going to write paging corresponding to Jetpack, but when sorting out the data, Kotlin found that he had not explained Flow, which is also a key point, so this paper will explain Flow in detail!
It is convenient to combine Flow and Paging for mixed explanation.
In that case! So what is Flow?
1. Know Flow
1.1 Introduction to Kotlin Flow
The official document gives a brief introduction:
Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);
Flow is something like an RxJava Observable. An Observable can be Cold or Hot.
Official is too official, RxJava is not familiar with the small partners, light by this concept is still a bit of a cloud.
Let’s go deeper into Flow through a series of small cases.
1.1 How to Represent Multiple Values?
Think about it: suspended functions can asynchronously return a single value, but how do you asynchronously return multiple calculated values?
emmm….
What can be done to return multiple values asynchronously?
- The collection?
- The sequence?
- Suspend function?
- And of course Flow, right?
So let’s see how this works, right?
1.1.1 collection
fun simpleList(a): List<Int> = listOf<Int> (1.2.3)
@Test
fun `test multiple values`(a) {
simpleList().forEach { value -> println(value) }
}
Copy the code
This does return multiple values, but not asynchronously!
1.1.2 sequence
fun simpleSequence(a): Sequence<Int> = sequence {
for (i in 1.3.) {
// thread.sleep (1000) // block the current Thread directly, not asynchronously!
// suspend (1000) // suspend (1000) Even if it was used here, it would not be used in the test below!
yield(i)
}
}
@Test
fun `test multiple values`(a) {
simpleSequence().forEach { value -> println(value) }
}
Copy the code
This method also returns multiple values, but not asynchronously!
1.1.3 Suspend functions
suspend fun simpleList2(a): List<Int> {
delay(1000)
return listOf<Int> (1.2.3)}@Test
fun `test multiple values2`(a) = runBlocking<Unit> {
simpleList2().forEach { value -> println(value) }
}
Copy the code
This method returns multiple values and is also asynchronous! Satisfy async return multiple values!
So what about Flow?
1.1.4 Flow way
fun simpleFlow(a) = flow<Int> {
for (i in 1.3.) {
delay(1000) // Pretend to be in something important
emit(i) // Emit, producing an element}}@Test
fun `test multiple values3`(a) = runBlocking<Unit> {
simpleFlow().collect { value -> println(value) }
}
Copy the code
Here we see the simpleFlow method, which uses the delay suspend function but does not have the suspend modifier, so it is not a suspend function! Can be used anywhere! (Non-coroutine module, non-suspended module)
Having looked at the examples above, let’s conclude:
1.1.5 Difference between Flow and other methods
- The flow type named flow builds the function
flow{... }
Code in a building block can be suspended- The function simpleFlow is no longer marked with the suspend modifier
- Streams emit values using the emit function
- The stream uses the collect function to collect values
Here’s another picture, just to make a better impression
Now that you have a general idea of Flow, what does it do?
1.2 application Flow
File download is a classic example of Flow in Android development
As is shown in
When the file is downloaded, the corresponding background download progress can send data through the emit in Flow and receive the corresponding data through collect. (Paging corresponding to Jetpack will be explained later)
1.3 cold flow
fun simpleFlow2(a) = flow<Int> {
println("Flow started")
for (i in 1.3.) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow is cold`(a) = runBlocking<Unit> {
val flow = simpleFlow2()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
Copy the code
The results
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
Copy the code
It can be seen from this run result:
Flow is a cold Flow similar to a sequence, and the code in the Flow builder does not run until the Flow is collected.
ChannelFlow is heat flow.
1.4 Continuity of flow
@Test
fun `test flow continuation`(a) = runBlocking<Unit> {(1.. 5).asFlow().filter {
it % 2= =0
}.map {
"string $it"
}.collect {
println("Collect $it")}}Copy the code
The results
Collect string 2
Collect string 4
Copy the code
From this running effect, we know:
- Each individual collection of a stream is performed sequentially, unless a special operator is used
- Each emitted value is processed by each transition operator upstream and downstream, and then passed to the end operator
1.5 Stream Builder
@Test
fun `test flow builder`(a) = runBlocking<Unit> {
flowOf("one"."two"."three")
.onEach { delay(1000) }
.collect { value ->
println(value)
}
(1.3.).asFlow().collect { value ->
println(value)
}
}
Copy the code
Running effect
one
two
three
1
2
3
Copy the code
From this running effect, we can know that the corresponding flow flow can be constructed through the corresponding flowOf and asFlow
1.6 Stream context
Here will be a few small cases for detailed explanation
1.6.1 case a
fun simpleFlow3(a) = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1.3.) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow context`(a) = runBlocking<Unit> {
simpleFlow3()
.collect { value -> println("Collected $value ${Thread.currentThread().name}")}}Copy the code
Running effect
Flow started Test worker @coroutine# 1
Collected 1 Test worker @coroutine# 1
Collected 2 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Copy the code
From here you can see that the emit to collect context runs through all of them, all in the same context.
What if you want to use another context for emit?
1.6.2 second case
fun simpleFlow4(a) = flow<Int> {
withContext(Dispatchers.Default) {
println("Flow started ${Thread.currentThread().name}")
for (i in 1.3.) {
delay(1000)
emit(i)
}
}
}
@Test
fun `test flow context`(a) = runBlocking<Unit> {
simpleFlow4()
.collect { value -> println("Collected $value ${Thread.currentThread().name}")}}Copy the code
Running effect
Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@3f3e9270, BlockingEventLoop@67bc91f8], ... slightlyCopy the code
Proffer: tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender
1.6.3 case three
fun simpleFlow5(a) = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1.3.) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.Default)
@Test
fun `test flow context`(a) = runBlocking<Unit> {
simpleFlow5()
.collect { value -> println("Collected $value ${Thread.currentThread().name}")}}Copy the code
Running effect
Flow started DefaultDispatcher-worker-1 @coroutine# 2
Collected 1 Test worker @coroutine# 1
Collected 2 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Copy the code
From these few small cases can be concluded
- The collection of a flow always takes place in the context of the invocation of the coroutine, and this property of the flow is held in context.
flow{... }
Code in the builder must store properties contextually and not allow them to occur from other contexts (emit)FlowOn operator
, this function is used forChange the context in which the flow is emitted
1.7 start the flow
1.7.1 case a
/ / the event source
fun events(a) = (1.3.)
.asFlow()
.onEach { delay(100) }
.flowOn(Dispatchers.Default)
@Test
fun `test flow launch`(a) = runBlocking<Unit> {
val job = events()
.onEach { event -> println("Event: $event ${Thread.currentThread().name}")}// .collect {}
.launchIn(CoroutineScope(Dispatchers.IO)) // Another context is used here to pass in Flow
//.launchin (this)// Here the current context is passed in as Flow
delay(200)
job.cancelAndJoin()
}
Copy the code
Here we see that collect has been cancelled and launchIn has been changed, and a new context has been passed in as the save context of Flow.
Job.join () or job.cancelandJoin () is called to wait for the Flow to complete its operation
Running effect
Event: 1 DefaultDispatcher-worker-1 @coroutine# 2
Event: 2 DefaultDispatcher-worker-1 @coroutine# 2
Copy the code
Because it was suspended for two seconds and cancelled, not all logs are printed and the context is: DefaultDispatcher
Let’s look at case two
1.7.2 second case
/ / the event source
fun events(a) = (1.3.)
.asFlow()
.onEach { delay(100) }
.flowOn(Dispatchers.Default)
@Test
fun `test flow launch`(a) = runBlocking<Unit> {
val job = events()
.onEach { event -> println("Event: $event ${Thread.currentThread().name}")}// .collect {}
LaunchIn (CoroutineScope(dispatchers.io)) // Another context is used here to pass in the Flow
.launchIn(this)// Flow is passed in using the current context
// delay(200)
// job.cancelAndJoin()
}
Copy the code
Running effect
Event: 1 Test worker @coroutine# 2
Event: 2 Test worker @coroutine# 2
Event: 3 Test worker @coroutine# 2
Copy the code
Because here the Flow is passed in the current context, there is no additional need to wait for execution to complete.
1.8 Stream cancellation
1.8.1 Passive Cancellation
fun simpleFlow6(a) = flow<Int> {
for (i in 1.3.) {
delay(1000)
emit(i)
println("Emitting $i")}}@Test
fun `test cancel flow`(a) = runBlocking<Unit> {
withTimeoutOrNull(2500) {
simpleFlow6().collect { value -> println(value) }
}
println("Done")}Copy the code
Running effect
1
Emitting 1
2
Emitting 2
Done
Copy the code
Here we see that withTimeoutOrNull is used to set the timeout method to cancel and stop execution in case of a timeout.
But this cancellations are all passive cancellations, what if you cancel actively?
1.8.2 Active cancellation
@Test
fun `test cancel flow check`(a) = runBlocking<Unit> {(1.. 5).asFlow().collect { value ->
println(value)
if (value == 3) cancel()
println("cancel check ${coroutineContext[Job]? .isActive}")}}Copy the code
Running effect
1
cancel check true
2
cancel check true
3
cancel check false
4
cancel check false
5
cancel check false
Copy the code
Here we see that when we cancel actively, the corresponding state has changed, but all are still executed!
This is because uncheck is not included here!
1.8.3 Canceling traffic detection
@Test
fun `test cancel flow check`(a) = runBlocking<Unit> {(1.. 5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
println("cancel check ${coroutineContext[Job]? .isActive}")}}Copy the code
Here we see the cancellable method used!
Run it again and see what happens
1
cancel check true
2
cancel check true
3
cancel check false
Copy the code
OK, here you can see that it has been successfully cancelled! On to the next topic!
1.9 back pressure
But before we do that, let’s look at the original case
fun simpleFlow8(a) = flow<Int> {
for (i in 1.3.) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")}}@Test
fun `test flow back pressure`(a) = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collect { value ->
delay(300) // Processing this element takes 300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")}Copy the code
Let’s see how it works
Collected 1 Test worker @coroutine# 1
Emitting 1 Test worker @coroutine# 1
Collected 2 Test worker @coroutine# 1
Emitting 2 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Emitting 3 Test worker @coroutine# 1
Collected in 1237 ms
Copy the code
Here we see that this is a very standard producer-consumer pattern, one to one. So try adding different keywords?
1.9.1 buffer (xx)
@Test
fun `test flow back pressure`(a) = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.buffer(50)
.collect { value ->
delay(300) // Processing this element takes 300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")}Copy the code
Here we see.buffer(50) added!
Let’s see how it works
Emitting 1 Test worker @coroutine# 2
Emitting 2 Test worker @coroutine# 2
Emitting 3 Test worker @coroutine# 2
Collected 1 Test worker @coroutine# 1
Collected 2 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Collected in 1108 ms
Copy the code
Here we see that the production messages are all stacked together and then sent in a centralized manner, which also takes a little less time than standard.
As is shown in
We can temporarily understand that Buffer (50) lengthens the corresponding transfer channel, so that the transfer channel can hold more elements.
Next, move on to the next operator!
1.9.2 conflate ()
@Test
fun `test flow back pressure`(a) = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.conflate()
.collect { value ->
delay(300) // Processing this element takes 300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")}Copy the code
Running effect
Emitting 1 Test worker @coroutine# 2
Emitting 2 Test worker @coroutine# 2
Emitting 3 Test worker @coroutine# 2
Collected 1 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Collected in 800 ms
Copy the code
Here we see that the consumer has not processed all of the elements corresponding to the producer. Move on to the next one!
1.9.3 collectLatest {}
@Test
fun `test flow back pressure`(a) = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collectLatest { value ->
delay(300) // Processing this element takes 300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")}Copy the code
This time, replace collect with collectLatest, which looks like just the last element
Emitting 1 Test worker @coroutine# 2
Emitting 2 Test worker @coroutine# 2
Emitting 3 Test worker @coroutine# 2
Collected 3 Test worker @coroutine# 5
Collected in 807 ms
Copy the code
If so, the operator will only process the last element! One last summary!
1.9.4 Back pressure summary
- Buffer (), the code that runs the emission element in the stream concurrently;
- Conflate (), merges emission items without processing each value;
- CollectLatest (), cancels and relaunches the last value
2. Operators
2.1 Transition flow operator
2.1.1 case a
suspend fun performRequest(request: Int): String {
delay(500)
return "response $request"
}
@Test
fun `test transform flow operator`(a) = runBlocking<Unit> {(1.3.).asFlow()
.map { request -> performRequest(request) }
.collect { value -> println(value) }
}
Copy the code
Running effect
response 1
response 2
response 3
Copy the code
Here we see the data flow as Map and then send each element in turn!
What if you want to send your own custom elements before and after you want to send them?
suspend fun performRequest(request: Int): String {
delay(500)
return "response $request"
}
@Test
fun `test transform flow operator`(a) = runBlocking<Unit> {(1.3.).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}.collect { value -> println(value) }
}
Copy the code
Here we see the transform used and the emit send element used in the corresponding closure to see how this works
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Copy the code
From this effect, we can customize emission elements in this way!
2.1.2 second case
fun numbers(a) = flow<Int> {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)}finally {
println("Finally in numbers")}}@Test
fun `test limit length operator`(a) = runBlocking<Unit> {
//take(2), indicating that the original stream is cancelled when the count element is consumed
numbers().take(2).collect { value -> println(value) }
}
Copy the code
Let’s see how it works
1
2
Finally in numbers
Copy the code
You can see from this runtime that when the consumer processes element 2, the corresponding original stream is cancelled!
2.1.3 summary
Transition-flow operators:
- Operators can be used to transform streams, just as collections and sequences are;
- The transition operator applies to the upstream flow and returns to the downstream flow;
- These operators are also cold operators, just like streams. Such operators are not themselves suspend functions
- It runs fast and returns the definition of the new transformation
2.2 Terminal Operators
The end operator is a suspend function on a stream that the user ** starts the stream collection. **collect is the most basic end operator, but there are other more convenient end operators:
- To various sets, such as toList and toSet;
- Gets the first value and ensures that the stream emits a single value
- Reduce and fold are used to shrink streams to a single value
Having said that, let’s see how it works!
@Test
fun `test terminal operator`(a) = runBlocking<Unit> {
val sum = (1.. 5).asFlow()
.map {
println("it * it= ${it * it}")
it * it
}
// Start accumulating values from the first element and apply the operation to the current accumulator value and to each element
.reduce { a, b ->
println("a=$a,b=$b,a+b=${a + b}")
a + b
}
println(sum)
}
Copy the code
Running effect
it * it= 1
it * it= 4
a=1,b=4,a+b=5
it * it= 9
a=5,b=9,a+b=14
it * it= 16
a=14,b=16,a+b=30
it * it= 25
a=30,b=25,a+b=55
55
Copy the code
It’s all in the notes, though a little sparse! Next!
2.3 Combining Multiple Streams
Just like the Kotlin library’s sequence. zip extension function, streams have a ZIP operator for combining related values in two streams!
Don’t say a word, just turn it on!
2.3.1 case a
@Test
fun `test zip`(a) = runBlocking<Unit> {
val numbs = (1.3.).asFlow()
val strs = flowOf("One"."Two"."Three")
numbs.zip(strs) { a, b -> "$a -> $b" }.collect { println(it) }
}
Copy the code
Running effect
1 -> One
2 -> Two
3 -> Three
Copy the code
Very simple, by zip the corresponding stream into one, and then output! So simple, let’s do something a little more complicated!
2.3.2 second case
@Test
fun `test zip2`(a) = runBlocking<Unit> {
val numbs = (1.3.).asFlow().onEach { delay(300)}val strs = flowOf("One"."Two"."Three").onEach { delay(400)}val startTime = System.currentTimeMillis()
numbs.zip(strs) { a, b -> "$a -> $b" }.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")}}Copy the code
Ha ha ha, slightly complicated, is the corresponding stream after the additional suspend wait, is the simulation of the corresponding time operation
Running effect
1 -> One at 462 ms from start
2 -> Two at 861 ms from start
3 -> Three at 1269 ms from start
Copy the code
Since this is suspended, not blocked, numBS and STRS do their own work without interfering with each other when they merge emitted elements.
So the base time for each launch is STRS, not numBS plus STRS. I believe readers can further understand the difference between a hang and a block.
2.4 show advection
Streams represent a sequence of values received asynchronously, so it is easy to run into situations where each value triggers a request for another sequence of values. However, due to the asynchronous nature of streams, different flattening modes are required, so there are a series of flattening operators:
- FlatMapConcat connection mode;
- FlatMapMerge merge mode;
- FlatMapLatest latest flatting mode
Let’s see how it works!
2.4.1 flatMapConcat Connection mode
fun requestFlow(i: Int) = flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")}@Test
fun `test flatMapConcat`(a) = runBlocking<Unit> {
//Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1.3.).asFlow()
.onEach { delay(100)}//.map { requestFlow(it) }
.flatMapConcat { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start")}}Copy the code
Running effect
1: First at 147 ms from start
1: Second at 653 ms from start
2: First at 755 ms from start
2: Second at 1256 ms from start
3: First at 1357 ms from start
3: Second at 1859 ms from start
Copy the code
Here we see that this connects the two streams, similar to the series mode
2.4.2 flatMapMerge Merge mode
fun requestFlow(i: Int) = flow<String> {
emit("$i: First")
delay(500)
emit("$i: Second")}@Test
fun `test flatMapMerge`(a) = runBlocking<Unit> {
//Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1.3.).asFlow()
.onEach { delay(100)}//.map { requestFlow(it) }
.flatMapMerge { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start")}}Copy the code
Running effect
1: First at 166 ms from start
2: First at 261 ms from start
3: First at 366 ms from start
1: Second at 668 ms from start
2: Second at 762 ms from start
3: Second at 871 ms from start
Copy the code
This mode is similar to the conflate mode in the combination + back pressure above, first the producer generates all the elements, and then notifies the consumer to consume!
2.4.3 flatMapLatest Latest flatting mode
@Test
fun `test flatMapLatest`(a) = runBlocking<Unit> {
//Flow<Flow<String>>
val startTime = System.currentTimeMillis()
(1.3.).asFlow()
.onEach { delay(100)}//.map { requestFlow(it) }
.flatMapLatest { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start")}}Copy the code
Running effect
1: First at 164 ms from start
2: First at 364 ms from start
3: First at 469 ms from start
3: Second at 971 ms from start
Copy the code
This pattern is similar to the combination +collectLatest pattern above. Very simple!
3. Exception handling
3.1 Exception handling of flows
When an exception is thrown by an emitter or code in an operator, there are several ways to handle exceptions:
- The try/catch block
- Catch the function
3.1.1 Case 1 (Try/Catch Block)
fun simpleFlow(a) = flow<Int> {
for (i in 1.3.) {
println("Emitting $i")
emit(i)
}
}
@Test
fun `test flow exception`(a) = runBlocking<Unit> {
try {
simpleFlow().collect { value ->
println(value)
// If the value is false, IllegalStateException and the result of calling lazyMessage are thrown.
check(value <= 1) { "Collected $value"}}}catch (e: Throwable) {
println("Caught $e")}}Copy the code
Running effect
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
Copy the code
That’s so easy. Next!
3.1.2 Case 2 (Catch Function)
@Test
fun `test flow exception2`(a) = runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")}.catch { e: Throwable -> println("Caught $e") }
.flowOn(Dispatchers.IO)
.collect { println(it) }
}
Copy the code
Now we handle the corresponding exception information directly through the catch {} code block.
Running effect
1
Caught java.lang.ArithmeticException: Div 0
Copy the code
We see that it says, “All the logic is in the catch. Can you come up with that?”
@Test
fun `test flow exception2`(a) = runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")}.catch { e: Throwable ->
println("Caught $e")
emit(10)
}.flowOn(Dispatchers.IO).collect { println(it) }
}
Copy the code
We see that the correspondence logic is out there, and that both the format and the readability of the code are much better than before
Let’s see how it works
Caught java.lang.ArithmeticException: Div 0
1
10
Copy the code
3.2 Completion of the flow
When the stream collection is complete (normal or abnormal), it may need to perform an action.
- Imperative finally block
- OnCompletion declarative processing
3.2.1 Case 1 (Finally Block)
fun simpleFlow2(a) = (1.3.).asFlow()
@Test
fun `test flow complete in finally`(a) = runBlocking<Unit> {
try {
simpleFlow2().collect { println(it) }
} finally {
println("Done")}}Copy the code
Very simple code block, this will not stick to run the effect of ha, do not want to eventually print Done!
3.2.2 Case 2 (onCompletion)
@Test
fun `test flow complete in onCompletion`(a) = runBlocking<Unit> {
simpleFlow2()
.onCompletion { println("Done") }
.collect { println(it) }
}
Copy the code
Here we remove the try… Finally, and an additional onCompletion code block, which implements the corresponding logic
Let’s see how it works
1
2
3
Done
Copy the code
Obviously the same as above!
conclusion
Well, that’s the end of this piece! I believe the friends who see this have a clear understanding of the asynchronous Flow! In the next article, we will cover the channel-multiplexe-concurrency security of Flow