Key words: Kotlin asynchronous programming coroutine

Last article we know several patterns of coroutine launch, but also through an example to understand the use of launch coroutine, this article will continue these content from the perspective of scheduling to further reveal the secret of coroutine for you.

1. Coroutine context

The scheduler is essentially an implementation of a coroutine context, which we’ll start with.

The first argument is called the context, and its interface type is CoroutineContext. Usually the context we see is of type CombinedContext or EmptyCoroutineContext. One represents a combination of contexts, the other nothing. Let’s look at the CoroutineContext interface method:


@SinceKotlin("1.3")
public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    public operator fun plus(context: CoroutineContext): CoroutineContext = ...
    public fun minusKey(key: Key< * >): CoroutineContext

    public interface Key<E : Element>

    public interface Element : CoroutineContext {
        public valkey: Key<*> ... }}Copy the code

If you’ve noticed, it’s a List indexed by Key:

CoroutineContext List
get(Key) get(Int)
plus(CoroutineContext) plus(List)
minusKey(Key) removeAt(Int)

Plus (elements: Iterable

): List

CoroutineContext is a collection whose elements are the elements seen in the source code. Each Element has a key, so it can appear as an Element. It is also a subinterface of CoroutineContext, so it can also appear as a collection.

At this point, you’ll see that CoroutineContext is actually a data structure. If you are familiar with the recursive definition of List, then it is easy to understand the CombinedContext and EmptyCoroutineContext. For example, scala’s List is defined like this:


sealed abstract class List[+A] extends.{...def head: A
    def tail: List[A]... }Copy the code

In pattern matching, List(1,2,3,4) matches x::y, where x is 1 and y is List(2,3,4).

The definition of CombinedContext is very similar:

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    ...
}
Copy the code

Except it’s the other way around, with the set in front and a single element behind. The coroutineContext we access in the body of coroutines is mostly of the Type CombinedContext, which represents a set of specific context implementations. If we want to find a particular context implementation, we need to use the corresponding Key to find it. For example:


suspend fun main(a){
    GlobalScope.launch {
        println(coroutineContext[Job]) // "coroutine#1":StandaloneCoroutine{Active}@1ff62014
    }
    println(coroutineContext[Job]) // null, suspend main is also a coroutine body, but it is lower level logic, so there is no Job instance
}

Copy the code

The Job here is actually a reference to its companion object


public interface Job : CoroutineContext.Element {
    /** * Key for [Job] instance in the coroutine context. */
    public companion objectKey : CoroutineContext.Key<Job> { ... }... }Copy the code

So we can also use a method like thread.currentThread () to get the current Job:

suspend inline fun Job.Key.currentJob(a) = coroutineContext[Job]

suspend fun coroutineJob(a){
    GlobalScope.launch {
        log(Job.currentJob())
    }
    log(Job.currentJob())
}
Copy the code

We can add features to a coroutine by specifying the context. A good example is to add a name to a coroutine for debugging purposes:

GlobalScope.launch(CoroutineName("Hello")) {... }Copy the code

If more than one context needs to be added, just use + :

GlobalScope.launch(Dispatchers.Main + CoroutineName("Hello")) {... }Copy the code

Dispatchers.Main is an implementation of the scheduler, don’t worry, we’ll get to know it soon enough.

Coroutine interceptors

After trying to finish the context, it’s time for a more specific existence — interceptors.


public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    ...
}

Copy the code

An interceptor is also a context implementation direction. An interceptor can dictate the execution of your coroutine, and is always placed at the bottom of the coroutine context set to ensure that it works correctly.

It’s also easy to intercept coroutines, because coroutines are essentially callbacks + “dark magic,” and the callback is the intercepted Continuation. OkHttp uses interceptors to do caching, log, but also to simulate requests, coroutine interceptors are the same. The scheduler is based on the interceptor, in other words, the scheduler is a kind of interceptor.

We can define our own interceptor and put it in the context of our coroutine and see what happens.


class MyContinuationInterceptor: ContinuationInterceptor{
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}

class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
    override val context = continuation.context
    override fun resumeWith(result: Result<T>) {
        log("<MyContinuation> $result" )
        continuation.resumeWith(result)
    }
}

Copy the code

We just typed a log line in the callback. Let’s take the use cases out:


suspend fun main(a) {
    GlobalScope.launch(MyContinuationInterceptor()) {
        log(1)
        val job = async {
            log(2)
            delay(1000)
            log(3)
            "Hello"
        }
        log(4)
        val result = job.await()
        log("5. $result")
    }.join()
    log(6)}Copy the code

This is probably the most complicated example we’ve given so far, but don’t be intimidated by it, it’s still pretty simple. We launch a coroutine with launch, specify our own interceptor as the context, and then launch a coroutine with async. Async and launch are functionally equivalent functions. They are both called coroutine Builder functions. The difference is that async started jobs, which are effectively Deferred, can have return results, which can be obtained with await methods.

As you can imagine, the value of result is Hello. So what’s the result of this program?

[main] <MyContinuation> Success(kotlin.unit) // ① 15:31:55:992 [main] 1 15:31:56:000 [main] <MyContinuation> Success(kotlin.unit) // ② 15:31:56:000 [main] 2 15:31:56:031 [main] 4 15:31:57:029 [kotlinx coroutines. DefaultExecutor] < MyContinuation > Success (kotlin. Unit) / / (3) 15:31:57:029 [kotlinx.coroutines.DefaultExecutor] 3 15:31:57:031 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(Hello) / / (4) 15:31:57:031 [kotlinx. Coroutines. DefaultExecutor] 5. Hello 15:31:57:031 [kotlinx. Coroutines. DefaultExecutor] 6Copy the code

“// ①” is not the output content of the program, only for the convenience of subsequent explanation and do annotations.

A Continuation is an await, which is used only once. The log is printed four times.

Don’t panic. We’ll introduce you in order.

First, when all coroutines start, there is a Continuation. ResumeWith operation, which is an opportunity for the scheduler to schedule. This is the key to our coroutine having a chance to schedule to another thread. This is the case in both ① and ②.

Secondly, delay is the starting point of suspension. After 1000ms, the coroutine needs to be scheduled again, so there is the log at ③.

Finally, the log at ④ is easy to understand and is our return result.

I did not switch threads in the interceptor. Why did I switch threads from ③? This logic is derived from delay. On the JVM, delay actually adds a delayed task to ScheduledExcecutor, so that thread switches occur. In JavaScript, it is based on setTimeout. If run on Nodejs, delay will not cut the thread, since it is single-threaded.

If we handle thread switching ourselves in the interceptor, then we have implemented a simple scheduler of our own that people are interested in trying out for themselves.

Think: Can there be more than one interceptor?

3. The scheduler

3.1 an overview of the

With this foundation in place, our introduction to the scheduler is a natural one.


public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    ...
    public abstract fun dispatch(context: CoroutineContext, block: Runnable). }Copy the code

It itself is a subclass of the coroutine context and implements the interceptor’s interface. The dispatch method is called in the interceptContinuation method of the interceptor, thus implementing the scheduling of the coroutine. So if we want to implement our own Dispatchers, we can just inherit from this class, but usually we use existing ones, which are defined in Dispatchers:

val Default: CoroutineDispatcher
val Main: MainCoroutineDispatcher
val Unconfined: CoroutineDispatcher
Copy the code

The definition of this class involves Kotlin MPP support, so you’ll also see Val IO: CoroutineDispatcher in the Jvm version. In js and native versions, there are only the aforementioned three.

Jvm Js Native
Default The thread pool Main thread loop Main thread loop
Main The UI thread The same as the Default The same as the Default
Unconfined Direct execution Direct execution Direct execution
IO The thread pool
  • IO is defined only on the Jvm. It is based on the thread pool behind the Default scheduler and implements separate queues and restrictions, so a coroutine scheduler switch from Default to IO does not trigger a thread switch.
  • Main is used for UI-related programs, including Swing, JavaFx, and Android on the Jvm, to schedule coroutines to their respective UI threads.
  • Js itself is a single-threaded event loop, similar to UI programs on the Jvm.

3.2 Write UI related programs

The vast majority of Kotlin’s users are Android developers, and there is a large demand for UI development. Let’s take a very common scenario of clicking a button to do something asynchronous and then calling back to refresh the UI:


getUserBtn.setOnClickListener { 
    getUser { user ->
        handler.post {
            userNameView.text = user.name
        }
    }
}

Copy the code

Let’s simply give the declaration of the getUser function:


typealias Callback = (User) -> Unit

fun getUser(callback: Callback){... }Copy the code

Since the getUser function needs to be switched to another thread, the callback is usually called from this non-UI thread as well, so to ensure that the UI is flushed correctly, we need to switch to the UI thread using handler.post. That’s the oldest way we’ve ever written it.

Then came RxJava, and things got interesting:


fun getUserObservable(a): Observable<User> {
    return Observable.create<User> { emitter ->
        getUser {
            emitter.onNext(it)
        }
    }
}

Copy the code

So the button click event could be written like this:


getUserBtn.setOnClickListener {
    getUserObservable()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { user ->
                userNameView.text = user.name
            }
}

Copy the code

In fact, RxJava is very good at thread switching, so many people even use it just for the convenience of cutting threads!

So let’s now transition this code to writing the coroutine:


suspend fun getUserCoroutine(a) = suspendCoroutine<User> {
    continuation ->
    getUser {
        continuation.resume(it)
    }
}

Copy the code

When the button is clicked, we can:


getUserBtn.setOnClickListener {
    GlobalScope.launch(Dispatchers.Main) {
        userNameView.text = getUserCoroutine().name
    }
}

Copy the code

You can also use the view.onclick extension in Anko-Coroutines, so we don’t have to launch the coroutine ourselves here. Anko’s support for coroutines is devoted to an article later.

SuspendCoroutine doesn’t start the coroutine, it runs inside the coroutine and it gets the Continuation instance of the current coroutine, so it gets the callback, Then we can call its resume or resumeWithException to return the result or throw an exception.

If you repeatedly call Resume or resumeWithException and you get an IllegalStateException, think about why.

Compared to the previous RxJava approach, you’ll notice that this code is actually easy to understand, and you’ll even notice how similar the coroutine usage scenarios are to RxJava. Here we use Dispatchers.Main to ensure that the launch coroutine is always scheduled to the UI thread when it is scheduled, so let’s take a look at the concrete implementation of Dispatchers.

On the Jvm, the implementation of Main is also interesting:


internal object MainDispatcherLoader {
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(a): MainCoroutineDispatcher {
        return try {
            val factories = MainDispatcherFactory::class.java.let { clz -> ServiceLoader.load(clz, clz.classLoader).toList() } factories.maxBy { it.loadPriority }? .tryCreateDispatcher(factories) ? : MissingMainCoroutineDispatcher(null)}catch (e: Throwable) {
            MissingMainCoroutineDispatcher(e)
        }
    }
}

Copy the code

In Android, the coroutine framework registers the AndroidDispatcherFactory so that Main is eventually assigned to an instance of HandlerDispatcher, Interested to see the kotlinx-Coroutines – Android source implementation.

Note that in the previous implementations of RxJava and coroutines, we did not consider exceptions and cancellations. Exceptions and cancellations will be covered in more detail in a later article.

3.3 A scheduler bound to any thread

The purpose of the scheduler is to cut threads, so don’t do yourself a disservice by thinking that I’m going to randomly call it at Dispatch based on my mood. The scheduler should be easily created as long as we provide the thread:


suspend fun main(a) {
    val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
    GlobalScope.launch(myDispatcher) {
        log(1)
    }.join()
    log(2)}Copy the code

The output indicates that the coroutine is running on our own thread.

16:10:57:130 [MyThread] 1
16:10:57:136 [MyThread] 2
Copy the code

Note, however, that since we created the thread pool ourselves, we need to close it when appropriate, otherwise:

We can do this by actively closing the thread pool or calling:

myDispatcher.close()
Copy the code

To end its life cycle, run the program again and it will exit normally.

Of course, one could argue that you’re creating a pool of threads that aren’t daemons, so the Jvm won’t stop running when the main thread ends. True, but release should be timely. If you only use the scheduler for a short period of time during the lifetime of your application, won’t you leak threads by leaving the pool open? This is embarrassing.

The Kotlin coroutine designers were so afraid that people wouldn’t notice that they actually deprecated two of the apis and issued an issue saying we need to redo them. Who are these two poor guys?

Two deprecated apis for creating schedulers based on thread pools

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
Copy the code

Both can be handy for creating a scheduler that is bound to a particular thread, but the API seems to run the risk of forgetting its simplicity. Kotlin doesn’t like to do this kind of thing, so you can build your own thread pool like we did in this example, so you can’t blame anyone else for forgetting to close it (lol).

Running a coroutine on multiple threads doesn’t make it very lightweight. For example, the following example is scary:


Executors.newFixedThreadPool(10)
        .asCoroutineDispatcher().use { dispatcher ->
            GlobalScope.launch(dispatcher) {
                log(1)
                val job = async {
                    log(2)
                    delay(1000)
                    log(3)
                    "Hello"
                }
                log(4)
                val result = job.await()
                log("5. $result")
            }.join()
            log(6)}Copy the code

With the exception of delay, where there is an inevitable thread switch, Continuation (.resume) at the beginning of the coroutine will cut the thread:

16:28:04:771 [pool-1-thread-1] 1
16:28:04:779 [pool-1-thread-1] 4
16:28:04:779 [pool-1-thread-2] 2
16:28:05:790 [pool-1-thread-3] 3
16:28:05:793 [pool-1-thread-4] 5. Hello
16:28:05:794 [pool-1-thread-4] 6
Copy the code

If our thread pool is only open for 1 thread, then all of the output here will be printed in that single thread:

16:40:14:685 [pool-1-thread-1] 1
16:40:14:706 [pool-1-thread-1] 4
16:40:14:710 [pool-1-thread-1] 2
16:40:15:723 [pool-1-thread-1] 3
16:40:15:725 [pool-1-thread-1] 5. Hello
16:40:15:725 [pool-1-thread-1] 6
Copy the code

Compared with the two, in the case of 10 threads, the number of thread switches is at least 3, while in the case of 1 thread, it is only once when the execution resumes after the delay of 1000ms. Just two more thread switches, how much difference does it make? I ran the cycle 100 times for each of the two different situations on my 2015 MBP, and the average time was as follows:

The number of threads 10 1
Take ms 1006.00 1004.97

Note that in order to be fair, you have warmed up before running 100 cycles to make sure that all classes are loaded. The test results are for reference only.

This means that two more thread switches can add 1ms on average. Production code is of course more complex, and you can imagine the result if you use thread pools to schedule.

In fact, usually we only need to handle our own business logic in one thread, and only some time-consuming IO needs to switch to the IO thread, so it’s a good idea to refer to the UI scheduler. It’s fine to define your own scheduler through a thread pool, but it’s better to use only one thread. Because in addition to the overhead of thread switching mentioned earlier, multithreading also has thread-safety issues.

3.4 Thread safety issues

The concurrency model of Js and Native is different from that of THE Jvm, which exposes the thread API to the user, which makes the scheduling of coroutines more flexible for the user. More freedom means more money, and one thing we need to understand when writing coroutines on the Jvm is that thread-safety issues still exist between different coroutines in the scheduler.

A good practice, as we mentioned in the previous section, is to keep your logic to a single thread, which saves the overhead of thread switching and avoids thread-safety issues.

If you use concurrency tools like locks in your coroutine code, you add complexity to the code. My advice is to avoid references to external-scope variables when writing coroutine code, and to use parameter passing instead of global variables.

Here’s an example of a mistake that’s easy to figure out:


suspend fun main(a){
    var i = 0
    Executors.newFixedThreadPool(10)
            .asCoroutineDispatcher().use { dispatcher ->
                List(1000000) {
                    GlobalScope.launch(dispatcher) {
                        i++
                    }
                }.forEach {
                    it.join()
                }
            }
    log(i)
}

Copy the code

Output results:

16:59:28:080 [main] 999593
Copy the code

4. How is the suspend Main function scheduled?

In the last article we mentioned that suspend Main starts a coroutine, and the coroutines in our example are subcoroutines of it, but where does this outermost coroutine come from?

Let’s start with an example:


suspend fun main(a) {
    log(1)
    GlobalScope.launch {
        log(2)
    }.join()
    log(3)}Copy the code

It is equivalent to the following:


fun main(a) {
    runSuspend {
        log(1)
        GlobalScope.launch {
            log(2)
        }.join()
        log(3)}}Copy the code

Why do you call this runSuspend divine? It’s a method of the Kotlin standard library, notice it’s not in kotlinx.coroutines, it’s actually in the lower API.


internal fun runSuspend(block: suspend(a) -> Unit) {
    val run = RunSuspend()
    block.startCoroutine(run)
    run.await()
}

Copy the code

In this case, RunSuspend is the implementation of a Continuation:


private class RunSuspend : Continuation<Unit> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    var result: Result<Unit>? = null

    override fun resumeWith(result: Result<Unit>) = synchronized(this) {
        this.result = result
        (this as Object).notifyAll()
    }

    fun await(a) = synchronized(this) {
        while (true) {
            when (val result = this.result) {
                null- > (this as Object).wait()
                else -> {
                    result.getOrThrow() // throw up failure
                    return
                }
            }
        }
    }
}

Copy the code

Its context is empty, so coroutines started from Suspend Main do not have any scheduling behavior.

As we can see from this example, all you really need to start a coroutine is a lambda expression. Back when Kotlin 1.1 was released, I wrote a series of tutorials based on the standard library API, and it turned out that the standard library API might not be for us. So just take a look.

The above code is labeled internal in the library, so we can’t use it directly. But you can copy RunSuspend. Kt into your project so that you can use var result: result

? Private var result: result

private var result: result

= null is fine.

5. Summary

In this article, we introduced the coroutine context, introduced the interceptor, and finally led to our scheduler, so far, we have exception handling, coroutine cancellation, Anko coroutine support and other topics, if you want to know about coroutine related topics, please leave a comment ~ ~ ~


Welcome to Kotlin Chinese community!

Chinese website: www.kotlincn.net/

Chinese official blog: www.kotliner.cn/

Official account: Kotlin

Zhihu column: Kotlin

CSDN: Kotlin Chinese community

Nuggets: Kotlin Chinese Community

Kotlin Chinese Community