Heat flow
Take a look at the simple producer-consumer model:
fun main(){
GlobalScope.launch {
val channel= Channel<Int>()
val producer=GlobalScope.launch {
var i=0
while (true){
delay(1000)
channel.send(i++)
}
}
val consumer=GlobalScope.launch {
while (true){
val element=channel.receive()
println(element)
}
}
producer.join()
consumer.join()
}
Thread.sleep(10000)
}
Copy the code
The producer sends a value every second, and the consumer keeps reading the value and printing a very simple example. But there is a lot of detail behind it. It must be well understood here, heat flow is not as simple as most of the following articles on the Internet say.
Let’s take a look at what the IDE does:
The send and receive functions of a channel are suspended.
As you can see from the above example, since the sender delays sending data for 1s, while the receiver is always reading data, so here
The sender is definitely slower than the receiver, which means that when the program starts running, the receiver will never receive data, so the Receivee function will stay there until a new element arrives.
Send is also a suspend function. Can send also be suspended?
Will SEND hang?
Let’s change the example a little bit:
GlobalScope.launch { val channel= Channel<Int>() val producer=GlobalScope.launch { var i=0 while (true){ delay(1000) i++ println("send $i") channel.send(i) } } val consumer=GlobalScope.launch { while (true){ delay(2000) val element=channel.receive() println("receive $element") } }Copy the code
Let’s make receive slower than Send and see if send is suspended.
As you can see from the graph, send is always executed after receiving.
What’s the problem? Take a look at our channel definition
The default value RENDEZVOUS means that the buffer of this channel will be 0, which means that I will not send data as long as you do not receive it, and I will send the data once you receive it
You can try it out and see what happens with different parameters
val channel= Channel<Int>(capacity = Channel.UNLIMITED)
Copy the code
Other ways to write receiver
These are probably a little bit more intuitive, but the only thing to notice about iterator is that it’s going to hang in hasNext and the second way is going to hang in
The closing of the channnel
Here’s an example:
GlobalScope.launch {
val channel= Channel<Int>(3)
val producer=GlobalScope.launch {
List(3){
channel.send(it)
println("send $it")
}
channel.close()
println("close channel send: ${channel.isClosedForSend} receive:${channel.isClosedForReceive}")
}
val consumer=GlobalScope.launch {
for (element in channel){
println("receive $element")
delay(1000)
println("close channel send: ${channel.isClosedForSend} receive:${channel.isClosedForReceive}")
}
}
producer.join()
consumer.join()
}
Copy the code
We create a channel with a cache size of three bytes, and then call the close method of the channel directly after the producer sends three bytes
Consumers receive data every 1s
The entire channel will not be closed until all the channels have been received.
This is enough for channel usage, but there are still some experimental apis that I won’t cover here
Just remember: The use of a channel must be actively called close at an appropriate time, otherwise the receiver will probably hang up all the time. Although there will be no memory leak, it will always occupy certain system resources, although the resources are small enough to be ignored. For close of a channel, We just need to remember to start dominating the close from the sender. Why?
The reason, of course, is that you only receive when you send, which is human logic after all, people wait for someone to send you a birthday present before you receive it
In addition, another characteristic of a channel is that the sender does not depend on the receiver, which is quite different from the cold flow that we will introduce next
Initial experience with Flow
fun main()= runBlocking{ val intFlow=flow{ (1.. 3).forEach { println("emit:"+Thread.currentThread().name) emit(it) delay(100) } } val coroutineDispatcher=Executors.newFixedThreadPool(3).asCoroutineDispatcher() GlobalScope.launch(coroutineDispatcher) { intFlow.flowOn(Dispatchers.IO).collect { println("collet:"+Thread.currentThread().name) println(it) } } Thread.sleep(300000) }Copy the code
Take a look at the execution result:
Here we should note:
FlowOn specifies the thread of execution on our production side. It can be seen from the code that the thread of launch is set as a customized dispatcher, so it can be clearly seen in the execution result that the thread of emit and Collet is different
If you read my previous article you will know that dispatchers. IO and dispatchers.default belong to the same thread, so here I have a custom dispatcher for demonstration purposes.
Those of you who are familiar with RXJava are probably familiar with subsscribeon and ObServeron for RXJava
We can change the code again:
Take a look at the execution result:
It can be seen that the emit executes twice when you collect2 times
To sum up: once a Flow is created, no consumption means no production, and multiple consumption means multiple production.
Cold flow is the flow of data that occurs during consumption, as opposed to our heat flow, where the sender is independent of the receiver
Exception handling
What happens if an exception occurs on the sending end?
Take a look at the execution result:
It can be seen that exceptions thrown by the sender are exposed at the receiver, which is risky for the receiver.
Take a look at the execution result:
So you can see that this onComplete can only be used to find exceptions, but it doesn’t have the ability to handle exceptions and you can clearly see from the output that it crashed
Her biggest role is to listen for the end of the flow
Another way to write it is:
Added a catch
Look at the results
It’s no big deal
It is also important to recall that flow does not provide cancellation because it is not required (think carefully why). If you must, cancel the coroutine to which the flow belongs
A couple of other ways to write flow
We sometimes want the production side and the consumer side of the code to be together so that it looks easy to process.
The following methods can be adopted:
fun createFlow()=flow<Int>{ (1.. 3).forEach { emit(it) delay(100) } }.onEach { println(it) } fun main()= runBlocking{ GlobalScope.launch() { createFlow().collect() } Thread.sleep(300000) }Copy the code
Sometimes when we’re producing data we might want to switch the coroutine context at some point
Back pressure
It’s a lofty concept, but it’s not that hard, except that you have to think about what happens when the producer is much faster than the consumer.
This isn’t going to happen very often on the client side, but let me give you an example
Chat software we all used, right? There must be a long link behind it for receiving messages. After receiving messages, we generally inform the corresponding UI interface to update the interface
Consider a scenario where an active user opens the app six months later and thousands of messages flood in, and if you update a UI every time you receive a message the interface will obviously freeze. Here is a typical application of back pressure.
You can look at it in code:
fun main()= runBlocking{
flow {
List(100){
emit(it)
}
}.conflate().collect {
println("collecting $it")
delay(100)
println("$it collected")
}
Thread.sleep(300000)
}
Copy the code
Conflate typically overlays old data with new data to solve the back pressure problem
There is also collectLatest, which differs from Conflate in that it overwrites old data with new data only when the old data has not been processed by the time new data arrives