Public number: byte array, keen to share Android system source code parsing, Jetpack source code parsing, popular open source library source code parsing and other essential knowledge interview
Recently, I have been learning about kotlin coroutines, and the best learning materials are naturally the official learning documents. After reading them, I have the idea of translating the official documents. Before and after spent close to a month time, a total of nine articles, here also share out, hope to help readers. Limited by personal knowledge, some translation is not too smooth, also hope that readers can put forward their opinions
Coroutines official documentation: Coroutines – Guide
Coroutines-cn-guide coroutines-cn-guide
Coroutine official document Chinese translator: leavesC
Deferred values provide a convenient way to pass single values between coroutines, while Channels provide a way to transmit streams of values
1. Channel Basics
A channel is very similar in concept to BlockingQueue, with one key difference: instead of a blocking PUT and a blocking take, a channel has a pending send and a pending receive
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main(a) = runBlocking {
//sampleStart
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1.. 5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
//sampleEnd
}
Copy the code
The output is:
1
4
9
16
25
Done!
Copy the code
Closing and iteration over Channels
Unlike a queue, a channel can be closed to indicate that an element has been sent. On the receiver side, it is convenient to use the regular for loop to receive elements from the channel
Conceptually, close is similar to sending a special Cloase tag to a channel. Once the close tag is received, the iteration stops, so all elements sent before the close are guaranteed to be received:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main(a) = runBlocking {
//sampleStart
val channel = Channel<Int>()
launch {
for (x in 1.. 5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
//sampleEnd
}
Copy the code
3. Building channel Producers
Patterns in which coroutines generate sequences of elements are very common. This is part of the producer-consumer pattern that is often found in concurrent programming. You could abstract such a producer as a function that takes a channel as an argument, but this goes against the common sense that you must return a result from the function
There is a handy coroutine constructor called Product that makes it easy to perform this operation on the producer side; There is also an extension function consumerEach, which replaces the for loop on the consumer side:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.produceSquares(a): ReceiveChannel<Int> = produce {
for (x in 1.. 5) send(x * x)
}
fun main(a) = runBlocking {
//sampleStart
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
//sampleEnd
}
Copy the code
4. Pipelines
A pipe is a pattern, a stream of values that a coroutine is generating that may be infinitely many elements
fun CoroutineScope.produceNumbers(a) = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
Copy the code
There are one or more coroutines that value the stream of values, do some processing, and produce some other result. In the following example, each return value is also the square of the input parameter value (number)
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
Copy the code
Start and connect the entire pipeline:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main(a) = runBlocking {
//sampleStart
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
repeat(5) {
println(squares.receive()) // print first five
}
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
//sampleEnd
}
fun CoroutineScope.produceNumbers(a) = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
Copy the code
All functions that create coroutines are defined as extensions of CoroutineScope, so we can rely on structured concurrency to ensure that there are no global coroutines that are delayed in the application
Prime Numbers with Pipeline
Let’s take the pipe to the extreme with an example of using coroutine pipes to generate prime numbers. We start with an infinite sequence of numbers
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
Copy the code
The following pipes filter the incoming stream of numbers, removing all numbers that are divisible by a given prime:
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if(x % prime ! =0) send(x)
}
Copy the code
Now, we get a prime number from the current channel by starting a stream of numbers at 2 and start a new channel for each prime number found:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7)...Copy the code
The following example code prints the first ten primes and runs the entire pipe in the context of the main thread. Because all coroutines are started within the scope of the main runBlocking coroutine, we do not have to keep explicit references to all started coroutines. We use the cancelChildren extension function to cancel all subcoroutines after the first ten primes
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main(a) = runBlocking {
//sampleStart
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd
}
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if(x % prime ! =0) send(x)
}
Copy the code
Running result:
2
3
5
7
11
13
17
19
23
29
Copy the code
Note that you can use the iterator coroutine constructor from the standard library to build the same pipe. Replace product with iterator, send with yield, receive with next, ReceiveChannel with iterator, and remove the coroutine scope. And you don’t have to use runBlocking anymore. However, the advantage of a pipeline using the channel shown above is that it can actually utilize multiple cpus to execute the code if run in the context of dispatchers.default
But in any case, the alternative described above is also a very impractical way to find primes. In fact, pipes do involve some other pending calls (such as asynchronous calls to remote services), and these pipes cannot be built using sequence/iterator because they do not allow arbitrary suspending, whereas Products are completely asynchronous
6. Fan-out
Multiple coroutines can receive data from the same channel and assign tasks among them. Let’s start with a producer coroutine that periodically generates integers (10 per second) :
fun CoroutineScope.produceNumbers(a) = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) / / wait 0.1 s}}Copy the code
Then we can have multiple processor coroutines. In this case, they just print their ID and the number they received:
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")}}Copy the code
Now let’s boot up five processors and let them work for almost a second. Here’s what happens:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main(a) = runBlocking<Unit> {
//sampleStart
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
//sampleEnd
}
fun CoroutineScope.produceNumbers(a) = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) / / wait 0.1 s}}fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")}}Copy the code
Although the processor ID that receives each particular integer may be different, the result of the run will be output similar to the following:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
Copy the code
Note that canceling the Producer coroutine closes its channel and ultimately terminates the iteration on the channel that the Processor coroutine is executing
Also, notice how we use a for loop to explicitly iterate over the channel to execute fan-out in the launchProcessor code. Unlike consumeEach, this for loop pattern is completely safe to use across multiple coroutines. If one of the processor coroutines fails, the other processors still process the channel, and the processor that writes through consumeEach always consumes (cancels) the underlying channel on normal or abnormal completion
7. Fan-in
Multiple coroutines can be sent to the same channel. For example, there is a string channel and a suspend function that repeatedly sends the specified string to the channel with the specified delay:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
Copy the code
Now, let’s see what happens if we start two coroutines to send strings (in this case, we start them as subcoroutines of the main coroutine, in the context of the main thread) :
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main(a) = runBlocking {
//sampleStart
val channel = Channel<String>()
launch { sendString(channel, "foo".200L) }
launch { sendString(channel, "BAR!".500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
Copy the code
Running result:
foo
foo
BAR!
foo
foo
BAR!
Copy the code
Buffered channels
None of the channels shown so far have buffers. Unbuffered channels transport elements when the send and receive operations are invoked by both the sender and the receiver. If send is called first, it hangs until Receive is called; If receive is called first, it hangs until Send is called
Both the Channel() factory function and the produce builder take the optional parameter capacity to specify the buffer size. Buffering is used to allow a sender to send multiple elements before suspending, similar to BlockingQueue with a specified capacity, which blocks only when the buffer is full
See the following code in effect:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main(a) = runBlocking<Unit> {
//sampleStart
val channel = Channel<Int> (4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full}}// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
//sampleEnd
}
Copy the code
A buffer channel of capacity 4 is used, so it will be printed five times:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
Copy the code
The first four elements are added to the buffer, and the sender hangs when trying to send the fifth element
The Channels are fair.
The send and receive operations on channels are fair to the order in which they are called from multiple coroutines. They are provided in a first-in, first-out order, for example, the coroutine that calls Receive first gets the element first. In the following example, two coroutines “ping” and “Pong” receive the “ball” object from the shared “table” channel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
//sampleStart
data class Ball(var hits: Int)
fun main(a) = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back}}//sampleEnd
Copy the code
The “ping” coroutine runs first, so it is the first to receive the ball. Even if the “ping” coroutine receives the ball immediately after sending it back to the table, the ball will still be received by “Pong” because it was already waiting for it:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
Copy the code
Note that sometimes, due to the nature of the executor used, a channel can have an execution effect that appears unfair. See this issue for more information
Ticker Channels
The timer channel is a special rendezvous channel that returns the Unit value at the end of each given delay time since the last consumption of the channel. Although it may seem useless, it is a useful building block for creating complex time-based produce pipes and for window-building and other time-based processing. The timer channel can be used for SELECT to perform the “on tick” operation
To create such a channel, use the factory method ticker. If you don’t need the channel to send any more elements, use Receivechannel. cancel on it
Now let’s see how it works in practice:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main(a) = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
Copy the code
Running result:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
Copy the code
Note that the Ticker is aware that the consumer may be paused, and by default, if a pause occurs, will delay the generation of the next element, trying to maintain a fixed rate of generated elements
Optionally, the mode argument to the ticker function can be specified as tickermode.fixed_delay to ensure a fixed delay between elements