Heat flow —- pipeline
What is a pipe?
It’s essentially a queue
A channel for transferring data from coroutine to coroutine, used to transfer variables
Kotlin’s channels are similar to JUC’s channels
Why pipes?
Send and receive variables in a way that does not block the current thread with the coroutine
We can think of Kotlin’s channel as BlockingQueue, but instead of blocking the queue put, a channel uses the suspend function send, and instead of blocking the queue take, the suspend function Receive
This gives a channel the advantage of not blocking the current thread
A channel is a data structure that allows one-way transmission of information, from the writing end of the pipe to the reading end of the pipe, all of which are serial and its order is unchanged
How do the pipes work?
Pipeline hello world
fun main(a): Unit = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1.. 5) {
channel.send(x)
}
}
repeat(5) {
log(channel.receive())
}
log("Done")}Copy the code
Send is suspended until the other party calls Recevie. Receive is suspended if the other party does not send any variables
The function of flow is to transfer variables between streams. What’s the difference? In fact, flow is cold, but channel is hot. Each variable sent can receive a variable immediately
How do pipes iterate and close?
Pipeline iteration
fun main(a): Unit = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1.. 5) {
channel.send(x * x)
}
}
for (y in channel) {
log(y)
}
log("Done")}Copy the code
The closing of the pipeline and the judgment of its closing state
@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1.. 5) {
if (channel.isClosedForSend) {
log("send: close send:${channel.isClosedForSend}, receive: ${channel.isClosedForReceive}")
break
}
channel.send(x)
}
}
repeat(5) {
if (it >= 3) {
channel.close()
}
if (channel.isClosedForReceive) {
log("receive: close send:${channel.isClosedForSend}, receive: ${channel.isClosedForReceive}")
return@repeat
}
log(channel.receive())
}
log("Done")}Copy the code
Producer and consumer models of pipelines
@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
val receiver = produce {
for (x in 1.. 5) {
send(x * x)
}
}
receiver.consumeEach { log(it) }
log("Done")}Copy the code
The underlying function produce will start a new coroutine
The result of one pipe can be given to another pipe
fun CoroutineScope.produceNumber(a) = produce<Int> {
repeat(10) {
send(it)
}
}
fun CoroutineScope.square(number: ReceiveChannel<Int>) = produce<Int> {
for (i in number) {
send(i * i)
}
}
fun main(a): Unit = runBlocking {
val number = produceNumber()
val receiver = square(number)
repeat(10) {
log(receiver.receive())
}
log("Done")
coroutineContext.cancelChildren()
}
Copy the code
Pipes and infinite primes
@ExperimentalCoroutinesApi
fun CoroutineScope.numbersProducer(start: Int) = produce {
var n = start
while (true) send(n++)
}
@ExperimentalCoroutinesApi
fun CoroutineScope.filterPrimes(numbers: ReceiveChannel<Int>, prime: Int) =
produce {
for (x in numbers) {
if(x % prime ! =0) {
send(x)
}
}
}
@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
var numbers = numbersProducer(2)
while (true) {
val prime = numbers.receive()
log("$prime")
numbers = filterPrimes(numbers, prime)
delay(1000)}}Copy the code
Channel buffer
We can set a buffer for the pipe, otherwise the pipe can only put one element at a time
fun main(a): Unit = runBlocking {
val channel: Channel<Int> = Channel<Int>(capacity = 2)
launch {
repeat(5) {
log("Sending $it")
channel.send(ThreadLocalRandom.current().nextInt(0.10))
}
}
launch {
repeat(5) {
log("Reading: ${channel.receive()}")}}}Copy the code
And then you’ll notice that every time you pass three elements into the pipe, and then you read, yeah remember that the pipe buffer is limited to two, but then you can pass three elements into the pipe, so be careful here
[main] Sending 0
[main] Sending 1
[main] Sending 2
[main] Reading: 3
[main] Reading: 4
[main] Reading: 6
[main] Sending 3
[main] Sending 4
[main] Reading: 8
[main] Reading: 7
Copy the code
Fan out: One more round
A message is distributed to multiple coroutines
@ExperimentalCoroutinesApi
private fun CoroutineScope.produceNumber(a) = produce {
var x = 1
while (true) {
send(x++)
delay(100)}}private fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
log("id = $id , received: $msg")}}@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
val product = produceNumber()
repeat(5) {
launchProcessor(it, product)
}
}
Copy the code
The console prints out different coroutines that are accepting
[main @coroutine#3] id = 0 , received: 1
[main @coroutine#3] id = 0 , received: 2
[main @coroutine#4] id = 1 , received: 3
[main @coroutine#5] id = 2 , received: 4
[main @coroutine#6] id = 3 , received: 5
[main @coroutine#7] id = 4 , received: 6
[main @coroutine#3] id = 0 , received: 7
[main @coroutine#4] id = 1 , received: 8
[main @coroutine#5] id = 2 , received: 9
[main @coroutine#6] id = 3 , received: 10
Copy the code
The radio channel
The previous fanning-out scenario is a multi-path scenario, but there is a problem. One coroutine sends 1-5 numbers to multiple coroutines. If it is the scenario above, only one coroutine can get the number 1, and the other coroutines cannot get the number 1, and the same is true between 2 and 5
The scenario is that the coroutine A sends 1 to 5 numbers to the five coroutines B, C, D, E, and F, and each thread needs to receive 1 to 5 numbers
@ObsoleteCoroutinesApi
fun main(a): Unit = runBlocking {
val channel = BroadcastChannel<Int>(Channel.BUFFERED)
launch {
List(5) {
delay(200)
channel.send(it)
}
channel.close()
}
List(5) {
launch {
val receiveChannel = channel.openSubscription()
for (i in receiveChannel) {
log("received: $i")
}
}
}.joinAll()
}
Copy the code
Fan in: multiple one
suspend fun sendString(channel: Channel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
fun main(a): Unit = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo".200L) }
launch { sendString(channel, "BAR".500L) }
repeat(6) {
log(channel.receive())
}
coroutineContext.cancelChildren()
}
Copy the code
Pipelines are fair: first in, first out
The sending and receiving channels follow the first-in, first-out principle
data class Ball(var hits: Int)
suspend fun player(name: String, channel: Channel<Ball>) {
for (ball in channel) {
ball.hits++
log("$name.${ball.hits}")
delay(100)
channel.send(ball)
}
}
fun main(a): Unit = runBlocking {
val channel = Channel<Ball>()
launch { player("ping", channel) }
launch { player("pong", channel) }
channel.send(Ball(0))
delay(1000)
// Terminate the loop
coroutineContext.cancelChildren()
}
Copy the code
Select multiplexing
Select multiplexing is commonly heard in many places, such as Linux select mechanism multiplexing, in select if we are wrong to use a thread to monitor multiple file handles (I remember limit is 1024?). , similar to a thread constantly polling each array for a value to determine if there is an event that needs to be processed
What is multiplexing?
The multiplexing of select in Kotlin looks like this:
Select the available and fastest function to execute from the onXXX events of multiple suspended functions
Look at the following code:
@ExperimentalCoroutinesApi
fun CoroutineScope.fizz(a) = produce {
while (true) {
delay(300)
send("Fizz")}}@ExperimentalCoroutinesApi
fun CoroutineScope.buzz(a) = produce {
while (true) {
delay(500)
send("Buzz")}}suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> {
fizz.onReceive { s -> log("fizz -> $s") }
buzz.onReceive { s -> log("buzz -> $s")}}}@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
val fizz = fizz()
val buzz = buzz()
selectFizzBuzz(fizz, buzz)
// fizz.cancel()
// buzz.cancel()
coroutineContext.cancelChildren()
}
Copy the code
[main] fizz -> Fizz
Here we have an obe
fun CoroutineScope.fizz(a) = produce {
while (true) {
delay(300)
send("Fizz")}}Copy the code
If we just use CoroutineScope, we can just use produce instead of having to figure out how to build a CoroutineScope CoroutineScope and then use produce, right
When I was writing the code, I wanted to use withContext, but it didn’t work, and I looked at it and realized that the thread was blocked at send
private suspend fun fizz(a) = withContext(Dispatchers.IO) {
log("fizz")
produce {
repeat(5) {
delay(300)
send("Fizz") // Block here}}}Copy the code
And then because it’s blocked at send and the withContext itself has to block and wait for its internal coroutine to complete, but send is blocked again, the internal code can’t complete, so it’s blocked indefinitely, The following code blocks on val fizz1 = fizz1() and cannot run further
@Test
fun test03(a) = runBlocking {
val fizz = fizz()
log("fizz: $fizz")}Copy the code
Something like this:
@Test
fun test03(a) = runBlocking {
coroutineScope {
// Fizz is blocked, infinitely blocked, and coroutineScope says,
If the internal coroutine does not end, the main thread is constrained (but not blocked),
// Do not allow the main thread to run out of scope, that is, cannot execute
// log("fizz1: $fizz1"
val fizz = fizz()
}
log("fizz1: $fizz1")}Copy the code
Remember to analyze the above code in detail, especially comments
abnormal
Cancelling coroutines throws a CancellationException at the point of suspension, which is normally invisible and requires special catching
Suspend point: The point where the suspend function + asynchronous operation is called the suspend point
suspend fun main(a) {
coroutineScope {
val job = launch {
try {
repeat(100000) {
delay(1000)
println("${Thread.currentThread()}: ping")}}catch (e: Exception) {
e.printStackTrace()
}
}
delay(3000)
job.cancel()
}
}
Copy the code
Thread[DefaultDispatcher-worker-1.5,main]: ping
Thread[DefaultDispatcher-worker-1.5,main]: ping
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling} @ 5d5390a2
Copy the code
Automatically propagates exceptions and exposes exceptions to users
There are two kinds of coroutine constructors: Launch exceptions (and actor) or expose exceptions (async and produce) to users. If the coroutine constructor is created in a root coroutine (that is, no coroutine child is it), the exception propagated automatically will be treated as an uncaught exception and will be thrown directly. Exposing exceptions to the user depends on the user ultimately consuming the exception (await or receive)
The root coroutine that automatically propagates the exception throws it directly
Look at the code
suspend fun main(a) {
coroutineScope {
val job = GlobalScope.launch {
throw IndexOutOfBoundsException() // An error is reported here
}
job.join()
val deferred = GlobalScope.async {
throw ArithmeticException()
}
try {
deferred.await() // An error is reported here
}
catch (e: Exception) {
log("I caught an exception here,$e")}}}Copy the code
If we no matter, the two will be an error, but the location of the error is different, launch the location of the error is throw IndexOutOfBoundsException () and async mode error is deferred. Await () this position
Exceptions that are not root coroutines are always propagated layer by layer to the root coroutine
Propagation of exceptions also involves interscope issues, as the GlobalScope we used earlier to create coroutines means that the exception is in a separate coroutineScope, which is not the case with the coroutine we created with coroutineScope
If a child coroutine encounters an exception, it throws an exception to its parent coroutine. The parent coroutine, if it has a parent coroutine, continues to pass it to its parent coroutine until it is passed to the root coroutine
So if an exception occurs on any of the child coroutines (with the exception of CancellationException), all the other coroutines will be stopped and no exceptions will occur on any of the other coroutines
The coroutines GlobalScope creates are independent of the parent coroutine of the coroutine in which it is created, which is the root coroutine, so this problem does not occur
It’s going to be a case where the whole coroutine tree is going to end up, and it’s going to be the container that we’re dealing with
It is designed to prevent the whole coroutine tree from failing due to the abnormal propagation of subroutines
fun main(a): Unit = runBlocking {
val scope = CoroutineScope(SupervisorJob())
val job1 = scope.launch {
delay(1000)
throw RuntimeException()
}
val job2 = scope.launch {
repeat(10) {
delay(150)
log("Coroutine 2 keeps running.")
}
}
joinAll(job1, job2)
}
Copy the code
If job1 fails, joB2 can be executed until the execution is complete
If we need to cancel all coroutines we can call scope.cancel()
supervisorScope
We could also use it like this
suspend fun main(a): Unit = supervisorScope {
launch {
repeat(10) {
delay(200)
log(Subcoroutine alive)
}
}
launch {
delay(1000)
throw AssertionError()
}
}
Copy the code
Can achieve the same effect
But there is a problem with this approach
If a scoped exception exists within the scope, as follows:
suspend fun main(a): Unit = supervisorScope {
launch {
repeat(10) {
delay(200)
log(Subcoroutine alive)
}
}
delay(1000)
throw AssertionError()
}
Copy the code
The coroutine is no longer allowed to run 10 rounds, and it is handled by an AssertionError that exits the entire container scope
CoroutineExceptionHandler: exception
CoroutineExceptionHandler only to capture the exception thrown directly coroutines (launch this not async)
It is only captured in the context of the CoroutineScope or in the root coroutine
private val handler = CoroutineExceptionHandler { _, throwable ->
log("1 handler $throwable")}@DelicateCoroutinesApi
fun main(a): Unit = runBlocking {
val job = GlobalScope.launch(handler) {
throw AssertionError() // Only this exception will be intercepted
}
val deferred = GlobalScope.async(handler) {
throw ArithmeticException() // This line of code inside async does not catch exceptions
}
joinAll(job, deferred)
}
Copy the code
fun main(a) {
val scope = CoroutineScope(handler)
scope.launch {
val job = launch {
throw AssertionError() // Only this exception will be intercepted
}
val deferred = async {
throw ArithmeticException() // This line of code inside async does not catch exceptions
}
joinAll(job, deferred)
}
TimeUnit.SECONDS.sleep(2)}Copy the code
But there’s a catch:
suspend fun main(a) {
val scope = CoroutineScope(Job())
val job = scope.launch {
launch(handler) {
throw IllegalArgumentException()
}
}
job.join()
}
Copy the code
The above code does not catch exceptions, as stated earlier “and only in the context of the CoroutineScope or the root coroutine.”
This is inconvenient, however, because you need to add exception handlers to each coroutine
useMETA-INF
The configuration file implements global exceptionsTo view
He doesn’t catch exceptions, he just sees what exceptions are there, and he logs the crash, right
val scope = CoroutineScope(handler)
GlobalScope.launch(handler)
So we can also do this:
class GlobalCoroutinesException : CoroutineExceptionHandler {
override val key: CoroutineContext.Key<*>
get() = CoroutineExceptionHandler
override fun handleException(context: CoroutineContext, exception: Throwable) {
log("Coroutine exception: $exception")}}@DelicateCoroutinesApi
fun main(a): Unit = runBlocking {
val job = GlobalScope.launch {
throw AssertionError()
}
val deferred = GlobalScope.async {
throw ArithmeticException()
}
joinAll(job, deferred)
}
Copy the code
In resouces directory to create services directory, then in the services directory to create file kotlinx. Coroutines. CoroutineExceptionHandler
Fill in the directory with GlobalCoroutiesException coroutines18 package name and type. The exception. GlobalCoroutiesException
I’m ready to run main
But there are still problems
Cancellation and exception
Cancelling a coroutine throws a CancellationException, but we cannot find it and need to catch it
And if the child coroutine’s job calls cancel it doesn’t cancel the parent coroutine
fun main(a): Unit = runBlocking {
val child = launch {
try {
delay(Long.MAX_VALUE)
} catch (e: Exception) {
log("I caught an exception$e")}finally {
log("Subcoroutine canceled.")
}
}
yield()
log("Ready to cancel subcoroutine:")
child.cancelAndJoin()
yield()
log("End of parent coroutine")}Copy the code
Exception aggregation: what happens when more than one child coroutine throws an exception?
private val handler = CoroutineExceptionHandler { _, throwable ->
log("log: $throwable ${throwable.suppressed.get(0)}")}@DelicateCoroutinesApi
fun main(a): Unit = runBlocking {
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
}
finally {
throw ArithmeticException()
}
}
launch {
try {
delay(1000)}finally {
throw ConcurrentModificationException()
}
}
}
job.join()
}
Copy the code
As the code shows, Kotlin will place multiple exceptions in the suppressed property of the exception