Premise of learning

  1. Java threads need to know roughly

  2. A coroutine is a task executed by a thread. The difference between a coroutine and a user thread is that it is backed by a powerful compiler. A coroutine has a coroutine-specific scheduler and a bunch of handy functions that allow the user to execute the coroutine on the desired thread within a certain range

What is a coroutine?

A: I think coroutines are tasks, and the coroutine scheduler and state machine are responsible for scheduling these tasks for the thread to execute

Coroutines are similar to tasks in games, and threads are similar to characters in games

Why coroutines? Or what is the advantage of coroutines?

A: The advantage of coroutines is that

  1. Fine scheduling, one second is still using the UI thread to perform some UI update operations, the next second it is found that the task needs to block, the blocking task is cut to other threads to execute, the UI thread continues to refresh the UI
  2. Writing asynchronous code like writing regular code makes it easier to write highly concurrent code
  3. Coroutines are also suitable for IO operations if the underlying coroutine implements non-blocking IO functions that are part of the coroutine
  4. Coroutines reduce the number of threads, reducing the number of context switches required

Coroutines basis

Hello coroutines

// There may be a problem with the thread GlobalScope started and this annotation needs to be added to indicate it
@DelicateCoroutinesApi
fun main(a) {
   // Create a global coroutine space whose declaration cycle will not be released until the end of app, so note that this function is more like new Thread
   GlobalScope.launch {
      The main thread will suspend the coroutine of the scope at this point, and then the main thread will exit the coroutine scope
      delay(1000)
      // Wait until the coroutine is suspended for 1 SEC to execute the following code
      println("${Thread.currentThread()}: 1.") / / Thread [DefaultDispatcher - worker - 1, 5, the main] : 1
   }
   // When the main thread encounters the above suspend function, it jumps directly to here and prints
   println("${Thread.currentThread()}: 2") // Thread[main,5,main]: 2
   // The main thread waits for 2 SEC
   Thread.sleep(2000)}Copy the code

Interpretation of the code above:

  1. Delay is the suspension function, and thread.sleep (…) in the Thread. Same, but suspend functions can only be executed by coroutines, not normal threads, otherwise an error will be reported

  2. Coroutines mission met hang function the thread does not block, but hang coroutines, the suspended function to the other thread execution (code above and not to other threads, so is the main thread in execution, but blocking function is not the main thread in the block, but with the aid of coroutines, where the code execution, stack, With coroutine mount, until the block ends, the coroutine resumes, and the thread returns to the place where the coroutine was suspended to continue executing the code of the coroutine scope.

GlobalScope has been pointed out by the Kotlin team as a problem in Kotlin 1.5, so be careful when using it. Is GlobalScope’s coroutine being used elsewhere, or is your coroutine being used by another person at the same time? GlobalScope’s coroutines live and die with your app, even if you don’t use them anymore, and GlobalScope’s coroutines can’t cancel, so be careful what happens if you use them

RunBlocking: Coroutine scope building

@DelicateCoroutinesApi
fun main(a) {
   // this: CoroutineScope is the CoroutineScope
   // Start a new coroutine in the background and continue
   GlobalScope.launch {
      delay(1000) // Non-blocking wait 1 second (default time in milliseconds)
      println("${Thread.currentThread()}: 1.") // Print output after delay
   }
   println("${Thread.currentThread()}: 2") // The coroutine is waiting, and the main thread is still executing
   // Another coroutine scope
   runBlocking {
      // Suspend the coroutine, but as a result of the runBlocking effect, the main thread must be released after all the child coroutines are executed, so the main thread is blocked at delay(2000).
      delay(2000)}}Copy the code
  1. runBlockingA coroutine scope is applied, which blocks the main thread untilrunBlockingAll subcoroutines in the scope have finished executing, remember the child thread

Note: runBlocking blocks the main function, usually no one dares to use it, just for testing purposes, generally consider using the coroutineScope

Using Globalscope. launch in the following code is no longer a subcoroutine of runBlocking, so it is not restricted by runBlocking

In the source code you will find this:

@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
    /** * Returns [EmptyCoroutineContext]. */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
Copy the code

As you can see, GlobalScope has its own coroutine context and runBlocking has its own coroutine context. The two coroutine contexts are completely different, so runBlocking cannot block GlobalScope’s starting coroutine

@DelicateCoroutinesApi
// Apply a coroutine scope to block execution threads
fun main(a) = runBlocking {
   Execute the following code using a global coroutine
   GlobalScope.launch {
      // Suspend function, suspend coroutine
      delay(1000)
      // After the suspension is complete, the thread begins to print the following
      println("${Thread.currentThread()}: 1.")}// The main thread prints
   println("${Thread.currentThread()}: 2")
   // The main thread hangs and waits
   delay(2000)}Copy the code

If delay(2000) is commented out, the main thread exits

If so:

fun main(a) = runBlocking {
   launch {
      delay(1000)
      println("${Thread.currentThread()}: 1.")
   }
   println("${Thread.currentThread()}: 2")}Copy the code

The main thread waits for the launch function’s child thread to terminate

If you want globalscope.launch to be waited on by the main thread, you need to extract the Job that returns the value, then job.join().

Wait for the coroutine to end

@DelicateCoroutinesApi
// Since job.join() is a suspended function, the function that calls the suspended function is also a suspended function
suspend fun main(a) {
   val job = GlobalScope.launch {
      delay(1000)
      println("${Thread.currentThread()}: 1.")
   }
   println("${Thread.currentThread()}: 2")
   job.join()
}
Copy the code

Structured concurrency

In globalScope. launch, we found that a Thread is global and cannot be supervised, so we should use globalScope. launch as little as possible

fun main(a): Unit = runBlocking {
   launch {
      delay(1000)
      println("${Thread.currentThread()}: 1.")
   }
   println("${Thread.currentThread()}: 2")}Copy the code

This allows the runBlocking above to control the internal subcoroutine until the subcoroutine terminates and the thread exits the coroutine scope

CoroutineScope: Scope build

In addition to the runBlocking above, Kotlin also provides coroutineScope for building coroutine scopes

CoroutineScope, like runBlocking, guarantees that a coroutineScope will not be closed until its subcoroutines have finished executing

suspend fun main(a) {
   println("1")
   coroutineScope {
      delay(1000)
      println("2")
   }
   println("3")}Copy the code

runBlockingcoroutineScopeThe difference between

RunBlocking blocks the main thread in the coroutineScope, whereas coroutineScope does not. It is a suspended function that does not block the thread, but releases it for other purposes

If the main thread is the UI thread, you’re still using runBlocking, you’re screwed, the UI thread is blocked, your app is blocked. But I’m not learning Kotlin for Android so test yourself

Cancellation and timeout of coroutines

fun main(a) {
   val scope = CoroutineScope(Dispatchers.Unconfined)
   scope.launch {
      repeat(100000) {
         delay(1000)
         println("ping")
      }
   }
   TimeUnit.SECONDS.sleep(6)
   scope.cancel()
}
Copy the code

Cancellation does not take effect if the coroutine performs CPU intensive computations

fun main(a): Unit = runBlocking {
   val start = Instant.now()
   val job = launch(Dispatchers.Default) {
      repeat(5) {
         if (Instant.now() >= start) {
            println("job: I'm sleeping $it ...")
            start.plusMillis(500)
         }
      }
   }
   delay(1300)
   job.cancelAndJoin()
   println("${Thread.currentThread()} is finished")}Copy the code

The way to terminate a coroutine in a computation

You can use isActive to stop coroutines in a calculation:

suspend fun main(a) = coroutineScope {
   val start = Instant.now()
   val job = launch(Dispatchers.Default) {
      var i = 0
      var next = start
      while (isActive) {
         if (Instant.now() >= next) {
            println("job: I'm sleeping ${i++} ...")
            next = next.plusMillis(500)
         }
      }
   }
   delay(1300)
   job.cancelAndJoin()
   println("${Thread.currentThread()} is finished")}Copy the code

Release resources in finally

fun main(a): Unit = runBlocking {
   val job = launch {
      try {
         repeat(1000) {
            println("${Thread.currentThread()}: job: I'm sleeping $it ...")
            delay(500)}}finally {
         println("${Thread.currentThread()}: I'm running finally")
      }
   }
   delay(1300)
   job.cancelAndJoin()
   println("${Thread.currentThread()} is finished")}Copy the code
Thread[main,5,main]: job: I'm sleeping 0. Thread[main,5,main]: job: I'm sleeping 1. Thread[main,5,main]: job: I'm sleeping 2. Thread[main,5,main]: I'm running finally
Thread[main,5,main] is finished
Copy the code

A code block that cannot be cancelledNonCancellable

fun main(a) = runBlocking {
   val job = launch {
      try {
         repeat(1000) {
            println("${Thread.currentThread()}: I'm sleeping $it ...")
            delay(500)}}finally {
         withContext(NonCancellable) {
            println("${Thread.currentThread()}: I'm running finally")
            delay(1000L)
            println("${Thread.currentThread()}: And I've just delayed for 1 sec because I'm non-cancellable")
         }
      }
   }
   delay(1300)
   println("${Thread.currentThread()}: I'm tired of waiting!")
   job.cancelAndJoin()
   println("${Thread.currentThread()}: Now I can quit.")}Copy the code

Compose suspend functions

Async concurrent

suspend fun doSomethingUsefulOne(a): Int {
    delay(1000)
    return 13
}

suspend fun doSomethingUsefulTwo(a): Int {
    delay(1000)
    return 14
}

suspend fun main(a): Unit = coroutineScope {
    val time1 = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println(one + two)
    }
    println("$time1 ms")

    val time02 = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("${one.await() + two.await()}")
    }
    println("$time02 ms")}Copy the code

The above function synchronizes the delay of the two coroutine subtasks so that the delay ends after a maximum of one second, or two seconds if the execution is serial

Async differs from launch in that async can receive the return value of its internal scope

More than a

public suspend fun await(a): T
Copy the code

Function to accept the return value

Delay starting async

private suspend fun doSomethingUsefulOne(a): Int {
    delay(1000)
    return 13
}

private suspend fun doSomethingUsefulTwo(a): Int {
    delay(1000)
    return 14
}

fun main(a): Unit = runBlocking {
    val time = measureTimeMillis {
        // The coroutine runs late
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        // The coroutine is officially launched here
        one.start()
        two.start()
        // Wait for two coroutine tasks to complete
        println("I answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")}Copy the code

Asynchronous style functions

@DelicateCoroutinesApi
fun somethingUsefulOneAsync(a) = GlobalScope.async {
    doSomethingUsefulOne()
}

@DelicateCoroutinesApi
fun somethingUsefulTwoAsync(a) = GlobalScope.async {
    doSomethingUsefulTwo()
}

@DelicateCoroutinesApi
suspend fun main(a) {
    val time = measureTimeMillis {
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")}Copy the code

In Kotlin this style of function is not recommended. If an error occurs during val one = somethingUsefulOneAsync() to {one.await()}, the exception is thrown by the main thread, which will then do something else, But somethingUsefulOneAsync will still execute stealthily, and the previous exception does not terminate the operation

The above method is not recommended, so how do we use it? Answer: Asynchronous structured concurrency

Asynchronous structured concurrency

suspend fun concurrentSum(a) : Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

suspend fun main(a)  {
    val measureTimeMillis = measureTimeMillis {
        println(concurrentSum())
    }
    println("$measureTimeMillis ms")}Copy the code

In Kotlin, the above approach is recommended

The advantage of this is that all coroutines in the coroutineScope scope will be cancelled as soon as an exception is thrown in that scope

If you have an internal coroutine that cannot be terminated by an exception, consider using the previous ### coroutine withContext(NonCancellable)

suspend fun concurrentSum(a): Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    launch {
        withContext(NonCancellable) {
            repeat(3) {
                delay(1000)
                println("delay 1 sec")
            }
        }
    }
    launch {
        println("Throw runtime exception termination coroutine")
        throw RuntimeException("Throw runtime exception termination coroutine")
    }
    one.await() + two.await()
}

suspend fun main(a) {
    val measureTimeMillis = measureTimeMillis {
        println(concurrentSum())
    }
    println("$measureTimeMillis ms")}Copy the code

The final output is:

Throw runtime exception terminating coroutine delay1 sec
delay 1 sec
delay 1 sec
Exception inThread "main" Java. Lang. RuntimeException: throw a runtime exception to terminate coroutinesCopy the code

The result is that the scope is not terminated immediately, but is thrown after a 3-second delay

Coroutine context scheduler

The context of the coroutine is the CoroutineContext which is responsible for storing intermediate elements during the scheduling of the coroutine, And he is the main elements of the Job, the scheduler (coroutine dispatcher), the name of coroutines (CoroutineName) and abnormal coroutines (CoroutineExceptionHandler), it can do addition subtraction combination

Scheduler and thread

The main function of the scheduler is to schedule the coroutine to be executed by a thread. Depending on which thread the scheduler decides, it can restrict the execution of the coroutine to a specific thread, dispatch it to a thread pool, or execute it without any restrictions

All coroutine builders launch or async have a parameter called CoroutineContext. The main purpose is to assign the type of scheduler to the builder

@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
    // The preceding numbers are the order in which they are printed
    launch { // Run in the context of the parent coroutine, the runBlocking main coroutine
        // 3 main runBlocking : I'm working in thread main
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) { // unrestricted -- will work in the main thread
        // 1 Unconfined : I'm working in thread main
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { // The default scheduler DefaultDispatcher will be retrieved
        // 2 Default : I'm working in thread DefaultDispatcher-worker-1
        println("Default               : I'm working in thread ${Thread.currentThread().name}")}val context = newSingleThreadContext("MyOwnThread")
    launch(context) { // Will make it acquire a new thread, which is a proprietary thread and needs to be closed if the thread resource is not used anymore because it is too expensive
        // 4 newSingleThreadContext: I'm working in thread MyOwnThread
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")}// 
    context.close()
}
Copy the code

The print details are in the notes

Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
main runBlocking      : I'm working in thread main
newSingleThreadContext: I'm working in thread MyOwnThread
Copy the code

We analyze them in turn:

  1. Launch {… }: Where the coroutine context (and the scheduler) inherits from the runBlocking context

  2. Dispatchers.Unconfined: a special dispatcher that does not specify which thread the coroutine must be executed on, but accepts whatever thread it is executed on

  3. Launch (dispatchers.default): Same thread pool as globalScope.launch

  4. NewSingleThreadContext: With this method, a dedicated thread is created for the execution of the coroutine, but threads are an expensive resource, so if they are no longer used, the thread in this method needs to be closed or designed as a top-level property for the entire system to use

Unrestricted scheduler vs restricted scheduler

When the coroutine scheduler is dispatchers. Unconfined, the dispatcher creates a coroutine inside the caller’s thread for execution, so which thread is responsible for the execution of the coroutine

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) {
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch {
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
}
Copy the code

Eventually he will print:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main
Copy the code

See? Because the Unconfined thread changed after a delay, the control group did not change anything and the main thread was still running

Why did it change? Because coroutines in case of delay after the main thread is cleared, the rest function to be executed hang by a thread in the execution kotlinx. Coroutines. DefaultExecutor

There are many other ways to prove the above property:

fun main(a) = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) {
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        withContext(Dispatchers.IO) {}
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch {
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        withContext(Dispatchers.IO) {}
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")}}Copy the code
Unconfined      : I'm working in thread main
Unconfined      : After delay in thread DefaultDispatcher-worker-1
main runBlocking: I'm working in thread main
main runBlocking: After delay in thread main
Copy the code

So is this example

suspend fun doSomething(a) {
    withContext(Dispatchers.IO) {
        println("IO      :  2 ${Thread.currentThread().name}")}}fun main(a): Unit = runBlocking {
    launch(Dispatchers.Unconfined) {
        println("Unconfined      : 1 ${Thread.currentThread().name}")
// delay(1000)
        doSomething()
        println("Unconfined      : 3 ${Thread.currentThread().name}")}}Copy the code
Unconfined      : 1 main
IO      :  2 DefaultDispatcher-worker-1
Unconfined      : 3 DefaultDispatcher-worker-1
Copy the code

So we can use this feature to test, you know, when you don’t know that the suspension function has finished and you don’t know what thread is executing, you can use this, right

Such as hang in front of the function of thread is kotlinx coroutines. Subsequent code or kotlinx. After DefaultExecutor coroutines. DefaultExecutor in execution

The suspension function is executed by the defaultDispatcher-worker-1 thread, and the code after the suspension function is executed by the defaultDispatcher-worker-1 thread

Debug coroutines and threads

Coroutines can be suspended on one thread and recovered on another, but the IDE is not smart enough to keep track of what the coroutine is doing at any given moment

Debug using the IDEA special plug-in

Using the Coroutine debugger, you can:

  1. Check the status of each Coroutine
  2. View the values of local and capture variables for running and pending Coroutines
  3. See the complete coroutine creation stack and the call stack inside the coroutine. The stack contains all stack frames with variables, even those that are lost during standard debugging
  4. Gets a complete report containing the state and stack information of each coroutine. To Get it, right-click on the Coroutines TAB and then click Get Coroutines Dump

Debug by printing logs

Print or log the information you want to print and that’s it

Kotlin for users to better use log debug, increased – Dkotlinx. Coroutines. The debug parameter

suspend fun doSomething(a) {
    withContext(Dispatchers.IO) {
        println("IO      :  2 ${Thread.currentThread().name}")}}fun main(a): Unit = runBlocking {
    println("main: 0 ${Thread.currentThread().name}")
    launch(Dispatchers.Unconfined) {
        println("Unconfined      : 1 ${Thread.currentThread().name}")
        doSomething()
        println("Unconfined      : 3 ${Thread.currentThread().name}")}}Copy the code

Before use:

main: 0 main
Unconfined: 1.main
IO      :  2 DefaultDispatcher-worker- 1Unconfined      : 3 DefaultDispatcher-worker- 1Copy the code

After use:

main: 0 main @coroutine# 1Unconfined: 1.main @coroutine# 2IO      :  2 DefaultDispatcher-worker1 @coroutine# 2Unconfined      : 3 DefaultDispatcher-worker1 @coroutine# 2Copy the code

You’ll notice that there are extra characters like @coroutine#1 and @coroutine#2. These are the coroutine identifiers, where @coroutine#1 is the main coroutine given by runBlocking, and @coroutine#2 is the child coroutine

Identifier only in debug mode can appear, in addition to – Dkotlinx. Coroutines. Debug, add – ea to VM parameters can also be to enter debug mode, you can also print out the name of coroutines

A coroutine jumps between threads

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

@ExperimentalCoroutinesApi
fun main(a) {
    newSingleThreadContext("ctx1").use { ctx1 ->
        newSingleThreadContext("ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")}}}}Copy the code
[ctx1 @coroutine#1] Started in ctx1
[ctx2 @coroutine#1] Working in ctx2
[ctx1 @coroutine#1] Back to ctx1
Copy the code

He shows that the same coroutine is executed between different threads, and uses use so that the thread applied by newSingleThreadContext doesn’t have to be closed manually

Name coroutines

fun main(a):Unit = runBlocking(CoroutineName("main")) {
    log("1")
    val v1 = async(CoroutineName("v1 coroutine")) {
        delay(500)
        log("2")
        255
    }
    val v2 = async(CoroutineName("v2 coroutine")) {
        delay(1000)
        log("3")
        7
    }
    log("4 result = ${v1.await() + v2.await()}")}Copy the code

Combine elements in context

fun main(a): Unit = runBlocking {
    launch(Dispatchers.Default + CoroutineName("test")) {
        log("1")}}Copy the code

Combine the context and change the coroutine name to test

[DefaultDispatcher-worker-1 @test#2] 1

Context inheritance of coroutines

If the new coroutine is created in another coroutine’s Coroutine, then we call the new coroutine child coroutine and the other coroutine parent coroutine. The context of the child coroutine will be inherited from the parent coroutine except for Job, which will be a child Job of the parent coroutine, but Job is new

fun main(a): Unit = runBlocking {
   val scope = CoroutineScope(Dispatchers.IO + Job() + CoroutineName("test"))
   val job = scope.launch {
      log("1 ${coroutineContext[Job]}")
      launch {
         log("2 ${coroutineContext[Job]}")
      }
   }
   job.join()
}
Copy the code
[DefaultDispatcher-worker-1 @test#2] 1 "test#2":StandaloneCoroutine{Active}@3eb7ab0f
[DefaultDispatcher-worker-3 @test#3] 2 "test#3":StandaloneCoroutine{Active}@33bb030c
Copy the code

Thread-local data

valthreadLocal: ThreadLocal<String? > = ThreadLocal<String? > ()fun main(a): Unit = runBlocking {
    threadLocal.set("main")
    log("1 ${threadLocal.get()}")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement("launch")) {
        log("2 ${threadLocal.get()}")
        yield()
        log("3 ${threadLocal.get()}")
    }
    job.join()
    log("4 ${threadLocal.get()}")}Copy the code
[main @coroutine#1] 1 main
[DefaultDispatcher-worker-1 @coroutine#2] 2 launch
[DefaultDispatcher-worker-1 @coroutine#2] 3 launch
[main @coroutine#1] 4 main
Copy the code

ThreadLocal is stored in an extra coroutine context, which is restored when the coroutine is context-switched, regardless of whether it is already a different coroutine

The new problem is that the coroutine can’t tell if ThreadLocal is still available and accessible to the coroutine, so Kotlin provides a method

threadLocal.ensurePresent()
Copy the code

Checks if ThreadLocal exists in the current coroutine context and throws an IllegalStateException if it does not

ThreadLocal also has a key limitation: when a thread-local variable changes (not coroutine changes, note), the new value is not propagated to the coroutine caller (because the context element cannot track all ThreadLocal object access), and the updated value is lost the next time it is suspended