Heat flow —- pipeline

What is a pipe?

It’s essentially a queue

A channel for transferring data from coroutine to coroutine, used to transfer variables

Kotlin’s channels are similar to JUC’s channels

Why pipes?

Send and receive variables in a way that does not block the current thread with the coroutine

We can think of Kotlin’s channel as BlockingQueue, but instead of blocking the queue put, a channel uses the suspend function send, and instead of blocking the queue take, the suspend function Receive

This gives a channel the advantage of not blocking the current thread

A channel is a data structure that allows one-way transmission of information, from the writing end of the pipe to the reading end of the pipe, all of which are serial and its order is unchanged

How do the pipes work?

Pipeline hello world

fun main(a): Unit = runBlocking {
   val channel = Channel<Int>()
   launch {
      for (x in 1.. 5) {
         channel.send(x)
      }
   }
   repeat(5) {
      log(channel.receive())
   }
   log("Done")}Copy the code

Send is suspended until the other party calls Recevie. Receive is suspended if the other party does not send any variables

The function of flow is to transfer variables between streams. What’s the difference? In fact, flow is cold, but channel is hot. Each variable sent can receive a variable immediately

How do pipes iterate and close?

Pipeline iteration

fun main(a): Unit = runBlocking {
   val channel = Channel<Int>()
   launch {
      for (x in 1.. 5) {
         channel.send(x * x)
      }
   }
   for (y in channel) {
      log(y)
   }
   log("Done")}Copy the code

The closing of the pipeline and the judgment of its closing state

@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
   val channel = Channel<Int>()
   launch {
      for (x in 1.. 5) {
         if (channel.isClosedForSend) {
            log("send: close send:${channel.isClosedForSend}, receive: ${channel.isClosedForReceive}")
            break
         }
         channel.send(x)
      }
   }
   repeat(5) {
      if (it >= 3) {
         channel.close()
      }
      if (channel.isClosedForReceive) {
         log("receive: close send:${channel.isClosedForSend}, receive: ${channel.isClosedForReceive}")
         return@repeat
      }
      log(channel.receive())
   }
   log("Done")}Copy the code

Producer and consumer models of pipelines

@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
   val receiver = produce {
      for (x in 1.. 5) {
         send(x * x)
      }
   }
   receiver.consumeEach { log(it) }
   log("Done")}Copy the code

The underlying function produce will start a new coroutine

The result of one pipe can be given to another pipe

fun CoroutineScope.produceNumber(a) = produce<Int> {
   repeat(10) {
      send(it)
   }
}

fun CoroutineScope.square(number: ReceiveChannel<Int>) = produce<Int> {
   for (i in number) {
      send(i * i)
   }
}

fun main(a): Unit = runBlocking {
   val number = produceNumber()
   val receiver = square(number)
   repeat(10) {
      log(receiver.receive())
   }
   log("Done")
   coroutineContext.cancelChildren()
}
Copy the code

Pipes and infinite primes

@ExperimentalCoroutinesApi
fun CoroutineScope.numbersProducer(start: Int) = produce {
   var n = start
   while (true) send(n++)
}

@ExperimentalCoroutinesApi
fun CoroutineScope.filterPrimes(numbers: ReceiveChannel<Int>, prime: Int) =
   produce {
      for (x in numbers) {
         if(x % prime ! =0) {
            send(x)
         }
      }
   }

@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
   var numbers = numbersProducer(2)
   while (true) {
      val prime = numbers.receive()
      log("$prime")
      numbers = filterPrimes(numbers, prime)
      delay(1000)}}Copy the code

Channel buffer

We can set a buffer for the pipe, otherwise the pipe can only put one element at a time

fun main(a): Unit = runBlocking {
   val channel: Channel<Int> = Channel<Int>(capacity = 2)
   launch {
      repeat(5) {
         log("Sending $it")
         channel.send(ThreadLocalRandom.current().nextInt(0.10))
      }
   }
   launch {
      repeat(5) {
         log("Reading: ${channel.receive()}")}}}Copy the code

And then you’ll notice that every time you pass three elements into the pipe, and then you read, yeah remember that the pipe buffer is limited to two, but then you can pass three elements into the pipe, so be careful here

[main] Sending 0
[main] Sending 1
[main] Sending 2
[main] Reading: 3
[main] Reading: 4
[main] Reading: 6
[main] Sending 3
[main] Sending 4
[main] Reading: 8
[main] Reading: 7
Copy the code

Fan out: One more round

A message is distributed to multiple coroutines

@ExperimentalCoroutinesApi
private fun CoroutineScope.produceNumber(a) = produce {
	var x = 1
	while (true) {
		send(x++)
		delay(100)}}private fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
	for (msg in channel) {
		log("id = $id , received: $msg")}}@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
	val product = produceNumber()
	repeat(5) {
		launchProcessor(it, product)
	}
}
Copy the code

The console prints out different coroutines that are accepting

[main @coroutine#3] id = 0 , received: 1
[main @coroutine#3] id = 0 , received: 2
[main @coroutine#4] id = 1 , received: 3
[main @coroutine#5] id = 2 , received: 4
[main @coroutine#6] id = 3 , received: 5
[main @coroutine#7] id = 4 , received: 6
[main @coroutine#3] id = 0 , received: 7
[main @coroutine#4] id = 1 , received: 8
[main @coroutine#5] id = 2 , received: 9
[main @coroutine#6] id = 3 , received: 10
Copy the code

The radio channel

The previous fanning-out scenario is a multi-path scenario, but there is a problem. One coroutine sends 1-5 numbers to multiple coroutines. If it is the scenario above, only one coroutine can get the number 1, and the other coroutines cannot get the number 1, and the same is true between 2 and 5

The scenario is that the coroutine A sends 1 to 5 numbers to the five coroutines B, C, D, E, and F, and each thread needs to receive 1 to 5 numbers

@ObsoleteCoroutinesApi
fun main(a): Unit = runBlocking {
    val channel = BroadcastChannel<Int>(Channel.BUFFERED)
    launch {
        List(5) {
            delay(200)
            channel.send(it)
        }
        channel.close()
    }
    List(5) {
        launch {
            val receiveChannel = channel.openSubscription()
            for (i in receiveChannel) {
                log("received: $i")
            }
        }
    }.joinAll()
}
Copy the code

Fan in: multiple one

suspend fun sendString(channel: Channel<String>, s: String, time: Long) {
	while (true) {
		delay(time)
		channel.send(s)
	}
}

fun main(a): Unit = runBlocking {
	val channel = Channel<String>()
	launch { sendString(channel, "foo".200L) }
	launch { sendString(channel, "BAR".500L) }
	repeat(6) {
		log(channel.receive())
	}
	coroutineContext.cancelChildren()
}
Copy the code

Pipelines are fair: first in, first out

The sending and receiving channels follow the first-in, first-out principle

data class Ball(var hits: Int)

suspend fun player(name: String, channel: Channel<Ball>) {
	for (ball in channel) {
		ball.hits++
		log("$name.${ball.hits}")
		delay(100)
		channel.send(ball)
	}
}

fun main(a): Unit = runBlocking {
	val channel = Channel<Ball>()
	launch { player("ping", channel) }
	launch { player("pong", channel) }
	channel.send(Ball(0))
	delay(1000)
   // Terminate the loop
	coroutineContext.cancelChildren()
}
Copy the code

Select multiplexing

Select multiplexing is commonly heard in many places, such as Linux select mechanism multiplexing, in select if we are wrong to use a thread to monitor multiple file handles (I remember limit is 1024?). , similar to a thread constantly polling each array for a value to determine if there is an event that needs to be processed

What is multiplexing?

The multiplexing of select in Kotlin looks like this:

Select the available and fastest function to execute from the onXXX events of multiple suspended functions

Look at the following code:

@ExperimentalCoroutinesApi
fun CoroutineScope.fizz(a) = produce {
   while (true) {
      delay(300)
      send("Fizz")}}@ExperimentalCoroutinesApi
fun CoroutineScope.buzz(a) = produce {
   while (true) {
      delay(500)
      send("Buzz")}}suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
   select<Unit> {
      fizz.onReceive { s -> log("fizz -> $s") }
      buzz.onReceive { s -> log("buzz -> $s")}}}@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
   val fizz = fizz()
   val buzz = buzz()
   selectFizzBuzz(fizz, buzz)
   // fizz.cancel()
   // buzz.cancel()
   coroutineContext.cancelChildren()
}
Copy the code

[main] fizz -> Fizz

Here we have an obe

fun CoroutineScope.fizz(a) = produce {
   while (true) {
      delay(300)
      send("Fizz")}}Copy the code

If we just use CoroutineScope, we can just use produce instead of having to figure out how to build a CoroutineScope CoroutineScope and then use produce, right

When I was writing the code, I wanted to use withContext, but it didn’t work, and I looked at it and realized that the thread was blocked at send

private suspend fun fizz(a) = withContext(Dispatchers.IO) {
   log("fizz")
   produce {
      repeat(5) {
         delay(300)
         send("Fizz") // Block here}}}Copy the code

And then because it’s blocked at send and the withContext itself has to block and wait for its internal coroutine to complete, but send is blocked again, the internal code can’t complete, so it’s blocked indefinitely, The following code blocks on val fizz1 = fizz1() and cannot run further

@Test
fun test03(a) = runBlocking {
   val fizz = fizz()
   log("fizz: $fizz")}Copy the code

Something like this:

@Test
fun test03(a) = runBlocking {
   coroutineScope {
       // Fizz is blocked, infinitely blocked, and coroutineScope says,
       If the internal coroutine does not end, the main thread is constrained (but not blocked),
       // Do not allow the main thread to run out of scope, that is, cannot execute
       // log("fizz1: $fizz1"
       val fizz = fizz()
   }
   log("fizz1: $fizz1")}Copy the code

Remember to analyze the above code in detail, especially comments

abnormal

Cancelling coroutines throws a CancellationException at the point of suspension, which is normally invisible and requires special catching

Suspend point: The point where the suspend function + asynchronous operation is called the suspend point

suspend fun main(a) {
	coroutineScope {
		val job = launch {
			try {
				repeat(100000) {
					delay(1000)
					println("${Thread.currentThread()}: ping")}}catch (e: Exception) {
				e.printStackTrace()
			}
		}
		delay(3000)
		job.cancel()
	}
}
Copy the code
Thread[DefaultDispatcher-worker-1.5,main]: ping
Thread[DefaultDispatcher-worker-1.5,main]: ping
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling} @ 5d5390a2
Copy the code

Automatically propagates exceptions and exposes exceptions to users

There are two kinds of coroutine constructors: Launch exceptions (and actor) or expose exceptions (async and produce) to users. If the coroutine constructor is created in a root coroutine (that is, no coroutine child is it), the exception propagated automatically will be treated as an uncaught exception and will be thrown directly. Exposing exceptions to the user depends on the user ultimately consuming the exception (await or receive)

The root coroutine that automatically propagates the exception throws it directly

Look at the code

suspend fun main(a) {
	coroutineScope {
		val job = GlobalScope.launch {
			throw IndexOutOfBoundsException() // An error is reported here
		}
		job.join()
		val deferred = GlobalScope.async {
			throw ArithmeticException()
		}
		try {
			deferred.await() // An error is reported here
		}
		catch (e: Exception) {
			log("I caught an exception here,$e")}}}Copy the code

If we no matter, the two will be an error, but the location of the error is different, launch the location of the error is throw IndexOutOfBoundsException () and async mode error is deferred. Await () this position

Exceptions that are not root coroutines are always propagated layer by layer to the root coroutine

Propagation of exceptions also involves interscope issues, as the GlobalScope we used earlier to create coroutines means that the exception is in a separate coroutineScope, which is not the case with the coroutine we created with coroutineScope

If a child coroutine encounters an exception, it throws an exception to its parent coroutine. The parent coroutine, if it has a parent coroutine, continues to pass it to its parent coroutine until it is passed to the root coroutine

So if an exception occurs on any of the child coroutines (with the exception of CancellationException), all the other coroutines will be stopped and no exceptions will occur on any of the other coroutines

The coroutines GlobalScope creates are independent of the parent coroutine of the coroutine in which it is created, which is the root coroutine, so this problem does not occur

It’s going to be a case where the whole coroutine tree is going to end up, and it’s going to be the container that we’re dealing with

It is designed to prevent the whole coroutine tree from failing due to the abnormal propagation of subroutines

fun main(a): Unit = runBlocking {
	val scope = CoroutineScope(SupervisorJob())
	val job1 = scope.launch {
		delay(1000)
		throw RuntimeException()
	}
	val job2 = scope.launch {
		repeat(10) {
			delay(150)
			log("Coroutine 2 keeps running.")
		}
	}
	joinAll(job1, job2)
}
Copy the code

If job1 fails, joB2 can be executed until the execution is complete

If we need to cancel all coroutines we can call scope.cancel()

supervisorScope

We could also use it like this

suspend fun main(a): Unit = supervisorScope {
	launch {
		repeat(10) {
			delay(200)
			log(Subcoroutine alive)
		}
	}
	launch {
		delay(1000)
		throw AssertionError()
	}
}
Copy the code

Can achieve the same effect

But there is a problem with this approach

If a scoped exception exists within the scope, as follows:

suspend fun main(a): Unit = supervisorScope {
	launch {
		repeat(10) {
			delay(200)
			log(Subcoroutine alive)
		}
	}
	delay(1000)
	throw AssertionError()
}
Copy the code

The coroutine is no longer allowed to run 10 rounds, and it is handled by an AssertionError that exits the entire container scope

CoroutineExceptionHandler: exception

CoroutineExceptionHandler only to capture the exception thrown directly coroutines (launch this not async)

It is only captured in the context of the CoroutineScope or in the root coroutine

private val handler =  CoroutineExceptionHandler { _, throwable ->
	log("1 handler $throwable")}@DelicateCoroutinesApi
fun main(a): Unit = runBlocking {
	val job = GlobalScope.launch(handler) {
		throw AssertionError() // Only this exception will be intercepted
	}
	val deferred = GlobalScope.async(handler) {
		throw ArithmeticException() // This line of code inside async does not catch exceptions
	}
	joinAll(job, deferred)
}
Copy the code
fun main(a) {
	val scope = CoroutineScope(handler)
	scope.launch {
		val job = launch {
			throw AssertionError() // Only this exception will be intercepted
		}
		val deferred = async {
			throw ArithmeticException() // This line of code inside async does not catch exceptions
		}
		joinAll(job, deferred)
	}
	TimeUnit.SECONDS.sleep(2)}Copy the code

But there’s a catch:

suspend fun main(a) {
	val scope = CoroutineScope(Job())
	val job = scope.launch {
		launch(handler) {
			throw IllegalArgumentException()
		}
	}
	job.join()
}
Copy the code

The above code does not catch exceptions, as stated earlier “and only in the context of the CoroutineScope or the root coroutine.”

This is inconvenient, however, because you need to add exception handlers to each coroutine

useMETA-INFThe configuration file implements global exceptionsTo view

He doesn’t catch exceptions, he just sees what exceptions are there, and he logs the crash, right

val scope = CoroutineScope(handler)

GlobalScope.launch(handler)

So we can also do this:

class GlobalCoroutinesException : CoroutineExceptionHandler {
	override val key: CoroutineContext.Key<*>
		get() = CoroutineExceptionHandler
	
	override fun handleException(context: CoroutineContext, exception: Throwable) {
		log("Coroutine exception: $exception")}}@DelicateCoroutinesApi
fun main(a): Unit = runBlocking {
	val job = GlobalScope.launch {
		throw AssertionError()
	}
	val deferred = GlobalScope.async {
		throw ArithmeticException()
	}
	joinAll(job, deferred)
}
Copy the code

In resouces directory to create services directory, then in the services directory to create file kotlinx. Coroutines. CoroutineExceptionHandler

Fill in the directory with GlobalCoroutiesException coroutines18 package name and type. The exception. GlobalCoroutiesException

I’m ready to run main

But there are still problems

Cancellation and exception

Cancelling a coroutine throws a CancellationException, but we cannot find it and need to catch it

And if the child coroutine’s job calls cancel it doesn’t cancel the parent coroutine

fun main(a): Unit = runBlocking {
    val child = launch {
        try {
            delay(Long.MAX_VALUE)
        } catch (e: Exception) {
            log("I caught an exception$e")}finally {
            log("Subcoroutine canceled.")
        }
    }
    yield()
    log("Ready to cancel subcoroutine:")
    child.cancelAndJoin()
    yield()
    log("End of parent coroutine")}Copy the code

Exception aggregation: what happens when more than one child coroutine throws an exception?

private val handler = CoroutineExceptionHandler { _, throwable ->
    log("log: $throwable ${throwable.suppressed.get(0)}")}@DelicateCoroutinesApi
fun main(a): Unit = runBlocking {
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                    delay(Long.MAX_VALUE)
            }
            finally {
                    throw ArithmeticException()
            }
        }
        launch {
            try {
                delay(1000)}finally {
                throw ConcurrentModificationException()
            }
        }
    }
    job.join()
}
Copy the code

As the code shows, Kotlin will place multiple exceptions in the suppressed property of the exception