“This article has participated in the call for good writing activities, click to view: the back end, the big front end double track submission, 20,000 yuan prize pool waiting for you to challenge!”

An overview of

You can think of a channel as a producer-consumer model, similar to a BlockQueue, where one side generates and sends data, and the other side receives and consumes data

Channel basis

Building simple channels

Let’s build a simple channel

In this example, both the Product and consumer methods pass in coroutine scope and channel object parameters,

In the product single, a coroutine sends a self-increasing int type data to the channel every 100ms, sending four

There is also a single coroutine in consumer, which opens the while loop and keeps polling for the latest value in the channel if the channel sender does not close the data

The code in the following sections builds on this

  1. code
fun main(a) {
    channel1()
}

fun channel1(a) = runBlocking {
    val channel = Channel<Int>()
    product(this, channel)
    consumer(this, channel)
}

/** * Send data */
fun product(scope: CoroutineScope, channel: Channel<Int>) {
    var count = 0
    scope.launch {
        while (count < 4) {
            channel.send(count++)
            delay(100)}}}/** * consumption data */
fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
    scope.launch {
        while(! channel.isClosedForSend) { delay(100)
            println(channel.receive())
        }
    }
}

Copy the code
  1. The log
0
1
2
3
Copy the code

Consume data using the Consume series

Consumers can use consumeEachIndexed and consumeEach to consume data.

   fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
        scope.launch {
            channel.consumeEach {
                println(it)
            }
        }
    }
Copy the code

Close and iterate the channel

Channels can be closed by calling channel.close, so that they can be actively closed when they are no longer needed.

When we call channel. close to close the Channel, if there is any data in the Channel that the consumer has not consumed, the consumer will continue to consume the remaining data

  1. code
class Channel2{

    fun channel1(a) = runBlocking {
        val channel = Channel<Int>()
        product(this, channel)
        consumer(this, channel)
    }

    /** * Send data */
    fun product(scope: CoroutineScope, channel: Channel<Int>) {
        var count = 0
        scope.launch {
            while (count < 4) {
                channel.send(count++)
                delay(100)/ / 100 ms delay
            }
            channel.close()// If close is called, it can complete normally. If not, it cannot complete normally}}/** * consumption data */
    fun consumer(scope: CoroutineScope, channel: Channel<Int>) {
        scope.launch {
            channel.consumeEach {
                delay(200)// The delay is 200ms, because the producer's delay is 100ms, so the life time of the consumer is much lower than the producer's
                println(it)
            }
        }
    }
}
Copy the code
  1. The log

Do not turn off channel logging:Closing channel logs:In this example code we must actively call close to close the channel after the producer has sent data, otherwise the channel will keep preventing the program from finishing

Build the pipe using the produce function

We can build a portable producer using the produce function, as shown below. We build Produce as an extension of CoroutineScope, and then call it directly from runBlocking.

Pipes can be cancelled

  1. code
fun CoroutineScope.produce(a)=produce<Int> {
    for (i in 1.4.){
        send(i)
    }
}

fun main(a)= runBlocking {
    produce().consumeEach {
        println(it)
    }
}
Copy the code
  1. The log

Buffered channel

  1. code
class BufferChannel {
    private val channel = Channel<Int> (4)
    private val scope = CoroutineScope(Dispatchers.Default)
    private var count = 0
    fun product(a) {
        scope.launch {
            while (count < 10) {
                channel.send(count++)
            }
        }
    }

    fun consumer(a) {
        scope.launch {
            channel.consumeEach {
                println("Consumption data:$it")}}}}fun main(a) = runBlocking {
    BufferChannel().apply {
        product()
        consumer()
    }
    println("End of execution")}Copy the code
  1. The log
End Consumption data: 0 Consumption data: 1 Consumption data: 2 Consumption data: 3Copy the code

Buffered channel

Unbuffered channels transfer elements (also known as “docking”) when the sender and receiver meet. If send is called first, it is suspended until receive is called, if receive is called first, it is suspended until send is called.

Both Channel and produce can specify a buffer where producers and consumers operate on data when the buffer is not full, and block when the buffer is full

This scenario is hard to simulate right now, so I’ll just show you the code

  1. code

class BufferChannel {
    private val channel = Channel<Int> (5)
    private var count = 0
    fun product(scope: CoroutineScope) {
        scope.launch {
            while (count < 10) {
                channel.send(count++)
            }
        }
    }

    fun consumer(scope: CoroutineScope) {
        scope.launch {
           while(! channel.isClosedForSend){ println(channel.receive()) } } } }fun main(a) = runBlocking {
    BufferChannel().let {
        it.product(this)
        it.consumer(this)
    }
    println("End of execution")}Copy the code
  1. The log
Perform the end0
1
2
3
4
5
6
7
8
9
Copy the code

Follow my public account “ananzhuo” to learn more knowledge