Heat flow

Take a look at the simple producer-consumer model:

fun main(){

    GlobalScope.launch {
        val channel= Channel<Int>()
        val producer=GlobalScope.launch {
            var i=0
            while (true){
                delay(1000)
                channel.send(i++)
            }
        }

        val consumer=GlobalScope.launch {
            while (true){
                val element=channel.receive()
                println(element)
            }
        }
        producer.join()
        consumer.join()
    }

    Thread.sleep(10000)

}

Copy the code

The producer sends a value every second, and the consumer keeps reading the value and printing a very simple example. But there is a lot of detail behind it. It must be well understood here, heat flow is not as simple as most of the following articles on the Internet say.

Let’s take a look at what the IDE does:

The send and receive functions of a channel are suspended.

As you can see from the above example, since the sender delays sending data for 1s, while the receiver is always reading data, so here

The sender is definitely slower than the receiver, which means that when the program starts running, the receiver will never receive data, so the Receivee function will stay there until a new element arrives.

Send is also a suspend function. Can send also be suspended?

Will SEND hang?

Let’s change the example a little bit:

GlobalScope.launch { val channel= Channel<Int>() val producer=GlobalScope.launch { var i=0 while (true){ delay(1000) i++  println("send $i") channel.send(i) } } val consumer=GlobalScope.launch { while (true){ delay(2000) val element=channel.receive() println("receive $element") } }Copy the code

Let’s make receive slower than Send and see if send is suspended.

As you can see from the graph, send is always executed after receiving.

What’s the problem? Take a look at our channel definition

The default value RENDEZVOUS means that the buffer of this channel will be 0, which means that I will not send data as long as you do not receive it, and I will send the data once you receive it

You can try it out and see what happens with different parameters

val channel= Channel<Int>(capacity = Channel.UNLIMITED)
Copy the code

Other ways to write receiver

These are probably a little bit more intuitive, but the only thing to notice about iterator is that it’s going to hang in hasNext and the second way is going to hang in

The closing of the channnel

Here’s an example:

GlobalScope.launch {
    val channel= Channel<Int>(3)
    val producer=GlobalScope.launch {
        List(3){
            channel.send(it)
            println("send $it")
        }
        channel.close()
        println("close channel  send: ${channel.isClosedForSend}  receive:${channel.isClosedForReceive}")
    }


    val consumer=GlobalScope.launch {
        for (element in channel){
            println("receive $element")
            delay(1000)
            println("close channel  send: ${channel.isClosedForSend}  receive:${channel.isClosedForReceive}")
        }
    }
    producer.join()
    consumer.join()
}
Copy the code

We create a channel with a cache size of three bytes, and then call the close method of the channel directly after the producer sends three bytes

Consumers receive data every 1s

The entire channel will not be closed until all the channels have been received.

This is enough for channel usage, but there are still some experimental apis that I won’t cover here

Just remember: The use of a channel must be actively called close at an appropriate time, otherwise the receiver will probably hang up all the time. Although there will be no memory leak, it will always occupy certain system resources, although the resources are small enough to be ignored. For close of a channel, We just need to remember to start dominating the close from the sender. Why?

The reason, of course, is that you only receive when you send, which is human logic after all, people wait for someone to send you a birthday present before you receive it

In addition, another characteristic of a channel is that the sender does not depend on the receiver, which is quite different from the cold flow that we will introduce next

Initial experience with Flow

fun main()= runBlocking{ val intFlow=flow{ (1.. 3).forEach { println("emit:"+Thread.currentThread().name) emit(it) delay(100) } } val coroutineDispatcher=Executors.newFixedThreadPool(3).asCoroutineDispatcher() GlobalScope.launch(coroutineDispatcher) { intFlow.flowOn(Dispatchers.IO).collect { println("collet:"+Thread.currentThread().name) println(it) } } Thread.sleep(300000) }Copy the code

Take a look at the execution result:

Here we should note:

FlowOn specifies the thread of execution on our production side. It can be seen from the code that the thread of launch is set as a customized dispatcher, so it can be clearly seen in the execution result that the thread of emit and Collet is different

If you read my previous article you will know that dispatchers. IO and dispatchers.default belong to the same thread, so here I have a custom dispatcher for demonstration purposes.

Those of you who are familiar with RXJava are probably familiar with subsscribeon and ObServeron for RXJava

We can change the code again:

Take a look at the execution result:

It can be seen that the emit executes twice when you collect2 times

To sum up: once a Flow is created, no consumption means no production, and multiple consumption means multiple production.

Cold flow is the flow of data that occurs during consumption, as opposed to our heat flow, where the sender is independent of the receiver

Exception handling

What happens if an exception occurs on the sending end?

Take a look at the execution result:

It can be seen that exceptions thrown by the sender are exposed at the receiver, which is risky for the receiver.

Take a look at the execution result:

So you can see that this onComplete can only be used to find exceptions, but it doesn’t have the ability to handle exceptions and you can clearly see from the output that it crashed

Her biggest role is to listen for the end of the flow

Another way to write it is:

Added a catch

Look at the results

It’s no big deal

It is also important to recall that flow does not provide cancellation because it is not required (think carefully why). If you must, cancel the coroutine to which the flow belongs

A couple of other ways to write flow

We sometimes want the production side and the consumer side of the code to be together so that it looks easy to process.

The following methods can be adopted:

fun createFlow()=flow<Int>{ (1.. 3).forEach { emit(it) delay(100) } }.onEach { println(it) } fun main()= runBlocking{ GlobalScope.launch() { createFlow().collect() } Thread.sleep(300000) }Copy the code

Sometimes when we’re producing data we might want to switch the coroutine context at some point

Back pressure

It’s a lofty concept, but it’s not that hard, except that you have to think about what happens when the producer is much faster than the consumer.

This isn’t going to happen very often on the client side, but let me give you an example

Chat software we all used, right? There must be a long link behind it for receiving messages. After receiving messages, we generally inform the corresponding UI interface to update the interface

Consider a scenario where an active user opens the app six months later and thousands of messages flood in, and if you update a UI every time you receive a message the interface will obviously freeze. Here is a typical application of back pressure.

You can look at it in code:

fun main()= runBlocking{

   flow {
       List(100){
           emit(it)
       }
   }.conflate().collect {
       println("collecting $it")
       delay(100)
       println("$it collected")
   }

    Thread.sleep(300000)
}
Copy the code

Conflate typically overlays old data with new data to solve the back pressure problem

There is also collectLatest, which differs from Conflate in that it overwrites old data with new data only when the old data has not been processed by the time new data arrives