Kotlin coroutines

In essence, coroutines are lightweight threads

That’s Kotlin’s official definition of a coroutine.

But that doesn’t say anything. What is it? From an Android developer’s point of view: think of it as a thread scheduling API — a Kotlin language feature, or programming idea. Kotlin used it to encapsulate a thread Api for us. (Although thread scheduling is the main usage scenario, it is actually much more than that). At this point, you might say: no, I use so many coroutines, there are no threads! Don’t panic, we’re going to take the Kotlin coroutine mask off Android today. Let’s see what it is.

What is a coroutine

Be sure to read this section to learn about Kotlin higher-order functions !!!! Related recommendations:

  • The official documentation
  • High-order functions in Kotlin, built-in high-order functions, and inlining
  • Lambda expressions and Kotlin higher-order functions

Or you can think of higher-order functions, or Lambda expressions, as anonymous inner classes where blocks of code are placed in a method inside the class.

Before we begin, let’s recall how Kotlin coroutines are used:

GlobalScope.launch {
    doSomething()
}
Copy the code

It’s that simple. But in fact it’s just a CoroutineScope (coroutines scope we speak again below) extension function, in fact is ultimately by calling createCoroutineUnintercepted (R, Continuation). The chain of calls is very simple, so I’m not going to expand it here.

package kotlin.coroutines.intrinsics

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}
Copy the code

But this method is a little “complicated”, so let’s create coroutines in a simpler way. Create using the primitive method:

public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
 SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
Copy the code

This is an extension of a suspended function. If you are not familiar with suspended and extension functions, it is recommended that you first read Kotlin’s higher-order functions, built-in higher-order functions, and inline and lambda expressions with Kotlin higher-order functions, or just read the official documentation.

Let’s see how to implement a coroutine using this function:

fun main(args: Array<String>) { suspend { println("A----------------${Thread.currentThread().name}") }.createCoroutine(object : Continuation<Unit> { override val context: CoroutineContext get() = EmptyCoroutineContext override fun resumeWith(result: Result<Unit>) { println("B----------------${Thread.currentThread().name}") } }).resume(Unit) Println (" C -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name} ")} out:  A----------------main B----------------main C----------------mainCopy the code

Very simply, create an anonymous function that calls the createCoroutine method and passes in an anonymous inner class Continuation. The code structure is simple:

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}
Copy the code

Keep this class in mind, and we’ll talk about it in more detail;

Finally, the coroutine is started by calling the Resume method. However, with ABC three printouts, this doesn’t seem quite the same as in Android. But that’s not the point of this section. We will focus only on the creation of coroutines.

The process of creating coroutines

Before we look at the creation of coroutines, let’s talk about higher-order functions. If you know anything about higher-order functions, you know that their bytecode implementation is an anonymous inner class. Just think of it as an anonymous inner class! Of course, the suspension function is also:

package kotlin.coroutines.jvm.internal // Suspension lambdas inherit from this class internal abstract class SuspendLambda( public override val arity: Int, completion: Continuation<Any?>? ) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction { constructor(arity: Int) : this(arity, null) public override fun toString(): String = if (completion == null) Reflection.renderLambdaToString(this) // this is lambda else super.toString() // this is continuation } // State machines for named suspend functions extend from this class internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) public override val context: CoroutineContext get() = _context!! @Transient private var intercepted: Continuation<Any? >? = null public fun intercepted(): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this) .also { intercepted = it } protected override fun releaseIntercepted() { val intercepted = intercepted if (intercepted ! = null && intercepted ! == this) { context[ContinuationInterceptor]!! .releaseInterceptedContinuation(intercepted) } this.intercepted = CompletedContinuation // just in case } }Copy the code

Suspension lambdas inherit from this class SuspendLambda’s Suspension lambdas inherit from this class. That’s right, our suspend {… } inherits from this. You can set a breakpoint in its toString method and run println(“${suspend {}}”) to see it executed. ContinuationImpl, its parent. Annotation need not translate! They ultimately inherit from BaseContinuationImpl. It’s not time to analyze it yet, but remember: BaseContinuationImpl is very important! But note their inheritance:

Suspend {… }, after all, it is an anonymous inner class.

All right, let’s start analyzing the createCoroutine method. It is an extension function that passes the suspend {… } this method can be summarized as an extension function of SuspendLambda that takes an instance of a Continuation and returns a Continuation. If you don’t understand extension functions, you can roughly equal them to the following Java code:

class SuspendLambda<T> { Continuation createCoroutine(Continuation<T> completion) { return new SafeContinuation(completion); }}Copy the code

A SuspendLambda instance calls its own method to create a new Continuation and returns it. And this safe situation, it also inherits from the Continuation.

So the whole logic is: ContinuationA calls one of its own methods, which takes a ContinuationB(completion created via anonymous inner classes) and then creates a ContinuationC(SafeContinuation) and returns it. Next we start SafeContinuation with resume:

public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))
Copy the code

It is also an extension function and calls the resumeWith method of the Continuation. ContinuationC is ContinuationC, and its corresponding implementation is resumeWith in SafeContinuation.

internal actual class SafeContinuation<in T> internal actual constructor( private val delegate: Continuation<T>, initialResult: Any? ) : Continuation<T>, CoroutineStackFrame { //... Public actual override fun resumeWith(result: Result<T>) { while (true) { // lock-free loop val cur = this.result // atomic read when { cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) { delegate.resumeWith(result) return } else -> throw IllegalStateException("Already resumed") } } } //... }Copy the code

Remember how SafeContinuation was created in the createCoroutine method? SafeContinuation (createCoroutineUnintercepted (completion). Intercepted (), COROUTINE_SUSPENDED). Here COROUTINE_SUSPENDED corresponds to the when expression. SafeContinuation. ResumeWith will eventually call delegate.resumeWith(result). The delegate is through createCoroutineUnintercepted (completion). Intercepted () to create, it is an agent, is also a Continuation. Then see createCoroutineUnintercepted, it is also a (suspend () – > T) extension functions, then in fact quite so SuspendLambda extension function, principle and createCoroutine:

@ SinceKotlin (" 1.3 ") public actual fun < T > (suspend () - > T). The createCoroutineUnintercepted (completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any? >).invoke(it) } }Copy the code

Code implementation above, it is important to note that this method is decorated with the Expect keyword and is platform-dependent. The code above is the implementation.

Continuing with the code: First probeCoroutineCreated is called:

internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
    return completion
Copy the code

The code is simple. Then look at the statement inside (this is base econtinuationImpl). This is suspend {}, which is, of course, BaseContinuationImpl. So the create(probeCompletion) call is what’s defined in the BaseContinuationImpl:

public open fun create(completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Continuation) has not been overridden") } public open fun create(value: Any? , completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Any? ; Continuation) has not been overridden") }Copy the code

Only definition, no implementation!! Let’s put

suspend {
     println("A----------------${Thread.currentThread().name}")
}
Copy the code

Decompile to Java. There is too much code here.

The create and invokeSuspend methods are implemented here. Our println code is in the invokeSuspend method, which is obviously where the code in the suspend function is executed. The create method is as simple as creating a new var2 (Continuation instance) and returning it. Var2, like its instance, is a SuspendLambda. This is followed by calling var2’s intercepted method, which can be implemented in ContinuationImpl, based on the inheritance relationship above:

public fun intercepted(): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this) .also { intercepted = it }Copy the code

Ignoring the context for a moment, we can simply say that we return this, which is var2. So, this is the delegate of SafeContinuation.

Github-KotlinEqualsJava if you are not familiar with the above process, I will use Java code to describe it. See Github-KotlinEqualsJava for details. . I won’t post the code here for space reasons.

At this point, the creation process is complete, and startup is in order.

The startup process of coroutines

The startup process, as mentioned above, is implemented by extending the function resume

public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))
Copy the code

Eventually it will call SafeContinuation resumeWith method, this has been seen in front of us.

Then continue to see SafeContinuation… resumeWith code. Back to the delegate. ResumeWith (result). And delegate, which is var2, is itself a SuspendLambda. Of course, its resumeWith calls its indirect parent BaseContinuationImpl. ResumeWith as follows:

public final override fun resumeWith(result: Result<Any? >) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var Current = this var param = result while (true) {probeCoroutineResumed(current) with(current) {val completion = completion!! Outcome === COROUTINE_SUSPENDED) return result.success (outcome)} catch (exception: {result.failure (exception)} releaseIntercepted() // this state machine instance is terminating (Completion is BaseContinuationImpl) {current = completion param = outcome} else {// note 4 completion.resumeWith(outcome) return } } } }Copy the code

Let’s start with note 1, where completion comes from the constructor, and it actually goes through the Continuation, the anonymous inner class, in our code at the beginning of the suspend{}.createcoroutin (Continuation) call.

Moving on to comment 2, where invokeSuspend is executed, note the current BaseContinuationImpl strength, which is the suspend{} function at the beginning. In the previous article, we mentioned that it decomcompiled into Java code, this time we will post invokeSuspend:

int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); StringBuilder var10000 = (new StringBuilder()).append("A----------------"); Thread var10001 = Thread.currentThread(); Intrinsics.checkNotNullExpressionValue(var10001, "Thread.currentThread()"); String var2 = var10000.append(var10001.getName()).toString(); boolean var3 = false; System.out.println(var2); return Unit.INSTANCE; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); }}Copy the code

The important thing is switch, label is only defined and it’s not assigned so it’s 0. As you can see, invokeSuspend is where the code block inside the suspend function is executed.

Moving on to note 3, Completion is the anonymous inner class we created, created directly with a Continuation, which is implemented directly. Is not a BaseContinuationImpl, so the code will execute through comment 4.

ResumeWith (“B—————-${thread.currentThread ().name}”)

At this point, the coroutine is done.

Implement a pseudo-coroutine process in Java

If the above a lot of flow around you dizzy, read the source code a few times. Just hit a break point and go step by step. But in order to get the big picture of how it works. Here with Java code to demonstrate the operation of the coroutine flow (pseudo). First define a coroutine interface:

public interface CoroutineInterface {
    public void resumeWith();
}
Copy the code

We then define an abstract class to refer to the suspended function:

public abstract class SuspendAnonymous implements CoroutineInterface { public CoroutineInterface mCoroutineInterface; public SuspendAnonymous(CoroutineInterface coroutineInterface) { this.mCoroutineInterface = coroutineInterface; } public abstract void invokeSuspend(); public static CoroutineInterface creatCoroutune(SuspendAnonymous suspendAnonymous){ return new CoroutineInterface() { @Override public void resumeWith() { suspendAnonymous.resumeWith(); }}; }}Copy the code

Finally, run in the main function:

Public static void main(String[] args) {// Create an anonymous inner class (for suspending functions) SuspendAnonymous SuspendAnonymous = new SuspendAnonymous(new) CoroutineInterface() {// Create anonymous inner class, Override public void resumeWith() {system.out.println ("B-------------"); }}) {Override public void resumeWith() {invokeSuspend(); mCoroutineInterface.resumeWith(); Public void invokeSuspend() {system.out.println ("A-------------"); }}; CoroutineInterface coroutineInterface = SuspendAnonymous.creatCoroutune(suspendAnonymous); coroutineInterface.resumeWith(); System.out.println("C-------------"); } out: A -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- B C -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --Copy the code

This is roughly what the ultra-streamlined process means. The code details are available on Github

You’re gonna see something like this, right? Isn’t that just an interface callback? With all this complexity, how do you actually perform asynchronous tasks? Don’t worry, get to the context and scheduler of coroutines. But before we do that, let’s summarize what a coroutine is:

What is a generalization coroutine?

The coroutine is not off thread, it runs in thread; It’s not a novelty, it’s a programming idea. It can be narrowly described as a set of wrapped apis that Kotlin provides to solve a problem, and as you can see from the above analysis, it is even just an interface callback! . The problem is concurrency, making concurrent tasks easier (use concurrency like synchronous code).

Context and scheduler for coroutines

In front of the coroutine creation process, start flow process. It is also summarized as a set of wrapped thread apis. But the entire create start process does not see any thread scheduling, the output ABCs are all in the same thread, and even the coroutine blocks the main process. This has nothing to do with threads, non-blocking calls. Don’t panic, the following coroutine context is used to solve this problem.

What is the coroutine context

The official definition of context is as follows:

Coroutines always run in contexts represented by CoroutineContext types that are defined in Kotlin’s standard library.

A coroutine context is a collection of various elements

As an Android developer, you must already be familiar with context. In Android, Context is an abstract class that provides access to application resources and methods for invoking application-level operations.

The same goes for the context of the coroutine, which not only gives me some additional capabilities, but also takes care of setting the name for the coroutine, starting and canceling the coroutine, and which thread or thread the coroutine executes on.

If you’ve used coroutines in Android, you’ll be familiar with the following code:

GlobalScope.launch(Dispatchers.IO + Job() + CoroutineName("name")) {}
Copy the code

IO + Job() + CoroutineName(“name”) they are all concrete implementations of context. You probably know that “+” is used to add context, but how do they work? To reduce the interference of other code functions in our tracking process, we first define four contexts: FirstContext, SecondContext, ThirdContext, and FourthContext.

Only the FirstContext implementation is shown here; the rest can be viewed on Github.

The default is to use EmptyCoroutineContext as the context, which inherits directly from CoroutineContext. As the name suggests, nothing is offered.

In general are using inheritance AbstractCoroutineContextElement way to define a context:

Class FirstContext: AbstractCoroutineContextElement (Key) {/ / forced autotype Key, as the Key value of the FirstContext companion object Key: CoroutineContext. Key < FirstContext > fun first () {println (" the first context -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- ")}}Copy the code

Note the companion object Key here: coroutinecontext. Key

. The compiler forces us to implement the associated object.

Run the following code:

fun main() { val coroutineContext = FirstContext() + SecondContext() + ThirdContext() + FourthContext() val createCoroutine = suspend { println("A------------") "B" }.createCoroutine(object : Continuation<String> { override val context: CoroutineContext get() = coroutineContext override fun resumeWith(result: Result<String>) { println("${result.getOrNull()}----------------") context[FirstContext]? .first() context[SecondContext]? .second() context[ThirdContext]? .third() context[FourthContext]? .fourth() } }) createCoroutine.resume(Unit) } out: A -- -- -- -- -- -- -- -- -- -- -- -- B -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the first context -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the second context -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the third context -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- The fourth context ------------------Copy the code

How are these four contexts added? How do you find them by context and call their corresponding methods? How are they stored?

The data structure of the coroutine context

Unlike Android, where the context is an abstract class, the context of a coroutine is an interface:

public interface CoroutineContext { public operator fun <E : Element> get(key: Key<E>): E? public fun <R> fold(initial: R, operation: (R, Element) -> R): R public operator fun plus(context: CoroutineContext): CoroutineContext = if (context === EmptyCoroutineContext) this else context.fold(this) { acc, element -> val removed = acc.minusKey(element.key) if (removed === EmptyCoroutineContext) element else { val interceptor  = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } } public fun minusKey(key: Key<*>): CoroutineContext public interface Key<E : Element> public interface Element : CoroutineContext { public val key: Key<*> public override operator fun <E : Element> get(key: Key<E>): E? = @Suppress("UNCHECKED_CAST") if (this.key == key) this as E else null public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = operation(initial, this) public override fun minusKey(key: Key<*>): CoroutineContext = if (this.key == key) EmptyCoroutineContext else this } }Copy the code

The code is not complex and defines several methods and interfaces.

  • Plus method: This overrides the “+” operation. Which is oursFirstContext() + SecondContext()Practically equivalent toFirstContext().plus(SecondContext()).
  • interfaceElement, which implements the contextCoroutineContext .
  • It also defines an interface Key, which has no implementation but is used for retrieval. In a similarcontext[FirstContext]I can pull out the context because it’s all there.

Let’s take a look at what this Key actually is. Decompress FirstContext to get the following code:

public final class FirstContext extends AbstractCoroutineContextElement { public static final FirstContext.Key Key = new  FirstContext.Key((DefaultConstructorMarker)null); public FirstContext() { super((kotlin.coroutines.CoroutineContext.Key)Key); } public static final class Key implements kotlin.coroutines.CoroutineContext.Key { private Key() { } // $FF: synthetic method public Key(DefaultConstructorMarker $constructor_marker) { this(); }}}Copy the code

As you can see: key is a static constant object. It’s a class, not an object, and no matter how many instances of FirstContext you create, their key will always be the same object. This allows you to mark the type of a context.

Next we will analyze the data structure and implementation of the context using FirstContext() + SecondContext() + ThirdContext() + FourthContext(). Inheritance of these classes are as follows: AbstractCoroutineContextElement Element < – < – CoroutineContext. The + operator actually calls the plus method. This method takes a context and returns a new context. The concrete implementation of this has been shown above. Let’s take it out separately here:

public operator fun plus(context: CoroutineContext): CoroutineContext = // code ① if (context === EmptyCoroutineContext) this else ② context.fold(this) {acc, Var removed = acc.minuskey (element.key); var removed === EmptyCoroutineContext; ⑤ val interceptor = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } }Copy the code

Here’s a step-by-step look at the code:

  • Code 1: If context is a context of type EmptyCoroutineContext, return this, that is, do not add. Eg: If A+ EmptyCoroutineContext, plus returns this directly, which is A itself. Here’s a rule for adding: If you add EmptyCoroutineContext, you don’t actually add it.

  • Code 2: If the context is not a context of type EmptyCoroutineContext, call its fold method.

    Public fun

    fold(initial: R, operation: (R, Element) -> R): R There is an implementation in Element:

    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)
    Copy the code

    Since our custom contexts implement the Element interface, this fold calls the Element fold method. The method is simple. Take an R (the context) and a higher-order function, and call the higher-order function with R and this (the current context) as arguments. So if it is A+B, if it goes to code ②, acc corresponds to an instance of A, element corresponds to an instance of B. If you’re not familiar with this one, you’ll need to supplement your knowledge of higher-order functions and this. But the actual flow is as follows:

    A+B:
    
    A.plus(B) {
     this = A
     B.fold(this)
    }
    
    B.fold(A){
        this = B
        anonymous(A,this)
    }
    
    anonymous(acc A,element B) {
        doSomething()
    }
    Copy the code

    Anonymous corresponds to the anonymous function in our fold.

  • Code 3: Calls the minusKey method, which is also implemented in Element:

    public override fun minusKey(key: Key<*>): CoroutineContext =
                if (this.key == key) EmptyCoroutineContext else this
    Copy the code

    Compare the two keys (the companion object enforced above) and return EmptyCoroutineContext if the Key is the same (eg: A+A), otherwise return this (the caller).

  • Code ④ : If removed is EmptyCoroutineContext, the two contexts are of the same type. So just return element. That’s the addition B in code 2. Here’s another rule: If you add the same context type, just replace the old one with the new one.

  • ContinuationInterceptor = ContinuationInterceptor = ContinuationInterceptor ();

Next comes CombinedContext, which is key to implementing context storage:

internal class CombinedContext( private val left: CoroutineContext, private val element: Element ) : CoroutineContext, Serializable { override fun <E : Element> get(key: Key<E>): E? { var cur = this while (true) { cur.element[key]? .let { return it } val next = cur.left if (next is CombinedContext) { cur = next } else { return next[key] } } } public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = operation(left.fold(initial, operation), element) public override fun minusKey(key: Key<*>): CoroutineContext { element[key]? .let { return left } val newLeft = left.minusKey(key) return when { newLeft === left -> this newLeft === EmptyCoroutineContext -> element else -> CombinedContext(newLeft, element) } } }Copy the code

The CombinedContext class also implements the CoroutineContext interface, overriding the fold, GET, and minusKey methods. These methods are essentially the same as the elements in the Element class. The key difference is the loop. And it holds a left member variable. All get and minuskeys are added to continuously left traversal. And that should make sense to you. This is a linked list structure, and the left in it is the next in our commonly used linked list.

Then the rest of the code for the Plus method is easy to understand: if there is only one context, then the context is a normal context. However, if there are more than two, these contexts are stored as elements of the CombinedContext, and a linked list structure is implemented using the CombinedContext. Lists are implemented using the header method, but if there is a ContinuationInterceptor context, it is always placed at the head of the list. In order to

FirstContext() + EmptyCoroutineContext + ContinuationInterceptor() + SecondContext() + ThirdContext() + FourthContext (), for example:

The process of adding is roughly as follows:

Where the box represents the CombinedContext.

This is the process of adding.

Now let’s look at the get method, which is context[FirstContext].

The normal context implementation of the get method is very simple, just like the implementation of Element:

public override operator fun <E : Element> get(key: Key<E>): E? =
            if (this.key == key) this as E else null
Copy the code

Returns the current object if it matches, or null otherwise. CombinedContext is more complicated:

override fun <E : Element> get(key: Key<E>): E? { var cur = this while (true) { cur.element[key]? .let { return it } val next = cur.left if (next is CombinedContext) { cur = next } else { return next[key] } } }Copy the code

It’s a lot more code, but the logic is simple, just iterating through each node of the list looking for a context for a type match.

Finally, let’s look at the following code:

fun main() {
    val coroutineContext =
        FirstContext() + EmptyCoroutineContext + SecondContext() + ThirdContext() + FourthContext()
    println(coroutineContext)
}

out:

[FirstContext@27bc2616, SecondContext@3941a79c, ThirdContext@506e1b77, FourthContext@4fca772d]
Copy the code

Why is the output order the same as the insert order? The secret is the toString method of CombinedContext.

public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
        operation(left.fold(initial, operation), element)
        
override fun toString(): String =
        "[" + fold("") { acc, element ->
            if (acc.isEmpty()) element.toString() else "$acc, $element"
        } + "]"
Copy the code

The code logic is clear, with a recursive call to the fold until the last left is no longer a CombinedContext and goes directly to Element’s fold method. This is the reverse printing of the linked list.

To sum up:

  • Context is stored in a linked list, with new elements always in the header;
  • ContinuationInterceptor this list has special treatment for ContinuationInterceptor. If there is a ContinuationInterceptor context, it is always at the head of the list.
  • A linked list cannot have two contexts of the same type. If a context of the same type is inserted, the new context instance is replaced with the old one.
  • Linked lists use the Key matching type, which is an empty implementation of an interface and is itself a static constant. Used only as a key value. It’s like a key in a key-value pair.

How do coroutines implement thread scheduling

Reading this section requires you to understand the Handler process, preferably the process by which an Activity starts and flows through its lifecycle.

If you don’t know, you should know that Handler tasks are executed in a queue, and Activity lifecycle functions are triggered by handlers.

By now you should have a sense of context. Let’s take a look at how coroutines use context to implement thread scheduling. Inside Android’s onCreate method, run the following code:

override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main_messenger) Log.e("Thread-before", "---------${Thread.currentThread().name}") GlobalScope.launch(Dispatchers.Main) { Log.e("Main", "---------${Thread.currentThread().name}") } GlobalScope.launch(Dispatchers.IO) { Log.e("IO", "-- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name}")} the e (" Thread - after ", "-- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name}")} out:  Thread-before: ---------main Thread-after: ---------main IO: ---------DefaultDispatcher-worker-1 Main: ---------mainCopy the code

How does dispatchers. IO change the thread on which the code runs? Since dispatchers. Main is also executed in the Main thread, why does it output last? Dispatchers.IO is executed in a different thread, and its output order is understandably uncertain. But why Main: ——— Main is always printed last? Does this not match the output of ABC at the beginning?

How do dispatchers. IO switch threads?

Let’s start by looking at what globalScope.launch is:

public fun CoroutineScope.launch(
   : CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

A CoroutineScope(CoroutineScope, covered in the next section) extension function that takes a context (default EmptyCoroutineContext), We also accept a CoroutineStart(coroutinestart.default)start, and then a block of code, which is the code we want to run. Then create a new context and create a StandaloneCoroutine coroutine of type and start it. Coroutine. Start (start, coroutine, block)

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this)
}
Copy the code

It executes start, the Invoke method of CoroutineStart:

public enum class CoroutineStart {
    DEFAULT,
    LAZY,
    ATOMIC,
    UNDISPATCHED;
    @InternalCoroutinesApi
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        }
    @InternalCoroutinesApi
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }
    @InternalCoroutinesApi
    public val isLazy: Boolean get() = this === LAZY
}
Copy the code

Looking at the code for CoroutineStart, there are two invoke overloads, and here the second one is called, taking three arguments. There are blocks, which are the code blocks in the suspended function; Receiver is the StandaloneCoroutine instance of the wearer in the Launch method; The Completion context object, which is also passed in the StandaloneCoroutine instance (the StandaloneCoroutine is an implementation of both Continuation and CoroutineContext; it has implementations of both interfaces).

We focus on the key, since the DEFAULT start in launch is coroutinestart.default. So here is: block. StartCoroutineCancellable (receiver, completion). Obvious startCoroutineCancellable is also an extension function:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }
Copy the code

Completion, the createCoroutineUnintercepted (receiver) intercepted (), which we cooked! So this is coroutine creation. This code returns an instance of a Continuation, and the next step is to start the method:

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}
Copy the code

And extension function, then there is this, also is the Continuation is DispatchedContinuation createCoroutineUnintercepted create. Remember coroutines create createCoroutineUnintercepted ultimately intercepted method of execution in the process?

public fun intercepted(): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this) .also { intercepted = it }Copy the code

Note that the dispatchers. IO context has been added. ContinuationInterceptor is a ContinuationInterceptor.

public val IO: CoroutineDispatcher = DefaultScheduler.IO

val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
)
Copy the code

This code is not in the same file, the final LimitingDispatcher inheritance relationships are as follows: LimitingDispatcher < – ExecutorCoroutineDispatcher < – CoroutineDispatcher. And is defined as follows:

CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor

It implements the ContinuationInterceptor interface, so some interceptContinuation methods are implemented here:

public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
Copy the code

ContinuationInterceptor this method is defined in ContinuationInterceptor, see its implementation in CoroutineDispatcher:

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
Copy the code

ContinuationInterceptor = LimitingDispatcher; ContinuationInterceptor = LimitingDispatcher; That’s what dispatchers.io created above. The DispatchedContinuation constructor is as follows:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) 
Copy the code

It is not hard to see, Continuation is front createCoroutineUnintercepted created instance, and the dispatcher, it is through the Dispatchers. IO create LimitingDispatcher.

Now back to the main flow, intercepted creates a DispatchedContinuation instance via interceptContinuation and executes its resumeCancellableWith method:

inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (! resumeCancelled(state)) { resumeUndispatchedWith(result) } } } }Copy the code

Dispatcher.dispatch (context, this). Dispatcher = LimitingDispatcher; dispatcher = dispatcher;

private val queue = ConcurrentLinkedQueue<Runnable>()

override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            // Commit in-flight tasks slot
            val inFlight = inFlightTasks.incrementAndGet()

        
            if (inFlight <= parallelism) {
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }
        
            queue.add(taskToSchedule)

            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return
            }

            taskToSchedule = queue.poll() ?: return
        }
}
Copy the code

Note how dispatcher.dispatch(context, this) is called. The Runnable parameter accepts this, our DispatchedContinuation itself, and its inheritance is as follows: DispatchedContinuation < – DispatchedTask < – SchedulerTask. SchedulerTask is an alias for Task:

internal actual typealias SchedulerTask = Task
Copy the code

Task implements the Runnable interface:

internal abstract class Task(
    @JvmField var submissionTime: Long,
    @JvmField var taskContext: TaskContext
) : Runnable {
    constructor() : this(0, NonBlockingContext)
    inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}
Copy the code

Finally, a Runnable is handed to the concurrent queue to execute our block, the DispatchedContinuation. So just look for the run method to find out what the final action is, which is in DispatchedTask:

public final override fun run() { val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> val continuation = delegate.continuation withContinuationContext(continuation, delegate.countOrElement) { val context = continuation.context val state = takeState() // NOTE: Must take state in any case, even if cancelled val exception = getExceptionalResult(state) /* * Check whether continuation was originally resumed with an exception. * If so, it dominates cancellation, otherwise the original exception * will be silently lost. */ val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null if (job ! = null && ! job.isActive) { val cause = job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception ! = null) { continuation.resumeWithException(exception) } else { continuation.resume(getSuccessfulResult(state)) } } } } catch (e: Throwable) { fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } }Copy the code

There is still a delegate delegate, and the resume method is best at executing continuations. This method was described in the coroutine startup section above. This is where the block of suspended function code is executed.

That’s how it switches threads. In dispatchers. Main, thread scheduling is a little different, which is ultimately implemented through HandlerContext:

internal class HandlerContext private constructor( private val handler: Handler, private val name: String? , private val invokeImmediately: Boolean ) : HandlerDispatcher(), Delay { public constructor( handler: Handler, name: String? = null ) : this(handler, name, false) @Volatile private var _immediate: HandlerContext? = if (invokeImmediately) this else null override val immediate: HandlerContext = _immediate ? : HandlerContext(handler, name, true).also { _immediate = it } override fun isDispatchNeeded(context: CoroutineContext): Boolean { return ! invokeImmediately || Looper.myLooper() ! = handler.looper } override fun dispatch(context: CoroutineContext, block: Runnable) { handler.post(block) } }Copy the code

As you can see, this is the Handler that switches the task to the main thread. This explains the output problem of the following code:

override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main_messenger)
        GlobalScope.launch(Dispatchers.Main) {
            Log.e("Main", "---------${Thread.currentThread().name}")
        }
        
        Log.e("Thread-after", "---------${Thread.currentThread().name}")
}
Copy the code

The reason threadafter is always executed before Main is because GlobalScope.launch executes coroutine blocks via handler.post, and our onCreate method executes via handler. They are executed by the main thread handler created in ActivityThread, and the handler message queue is executed sequentially and synchronously. If the previous task is not completed, the subsequent task will definitely not be carried out.

Since they’re all on the same thread, is there a way to make the code in the suspended function run first? Yes, notice the isDispatchNeeded method of HandlerContext, which is used to determine whether a dispatch is needed. By default HandlerContext is created using AndroidDispatcherFactory:

override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true))
Copy the code

InvokeImmediately is false, which means that the method returns true and requires scheduling, which is done through Handle. And if we call it as follows:

GlobalScope.launch(Dispatchers.Main.immediate) { Log.e("Main", "-- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name}")} the e (" Thread - after ", "-- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name}") out:  Main: ---------main Thread-after: ---------mainCopy the code

Dispatchers.Main itself returns a HandlerContext, and the HandlerContext has an immediate variable:

override val immediate: HandlerContext = _immediate ? : HandlerContext(handler, name, true).also { _immediate = it }Copy the code

It creates a new instance and returns, and invokeImmediately is true when the instance is created. Now, it’s not going to go through the scheduling process, it’s not going to use Handler, it’s just going to do the same thing.

Kotlin coroutines have three schedulers in Android:

  • Dispatchers.Default: Default thread pool, if no scheduler is specified, it is used, suitable for CPU intensive tasks;
  • Dispatchers.Main: executed in the Main thread;
  • Dispatchers.IO: executes on threads in a shared thread pool within Kotlin, suitable for IO tasks

What exactly is a suspend function

So let’s think about the question, what is the suspension of a suspend function? How did they recover? From the above context, it can be summarized as follows:

When A running thread, A, encounters A coroutine created by suspend, this function tells the current thread, A, to “leave me alone and play by myself.” The current thread A then continues on to the next task. Then the coroutine hangs. The block of function code is stripped away from the current thread. But instead of terminating, it is handed over to another thread to execute. It depends on how the scheduler arranges.

This is the essence of function suspension. Instead of suspending the function, it takes the coroutine out of the current thread and hands it over to another thread. Of course, the thread executed by the coroutine can also be the current thread.

This may sound like creating a new thread. However, executing tasks on new threads is not the best thing about coroutines. The best thing about coroutines is that they can easily “cut back” as if using synchronous operations:

OnCreate (){globalscope.launch (dispatchers.main){doSomething() log.e (" back to the Main thread ", "---------${Thread.currentThread().name}") } Log.e("Thread-after", "---------${Thread.currentThread().name}") } suspend fun doSomething() = withContext(Dispatchers.IO){ delay(1000) The e (" time-consuming operation ", "-- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name}")} out: Thread - after: -- -- -- -- -- -- -- -- -- the main time-consuming operation: -- -- -- -- -- -- -- -- -- DefaultDispatcher - worker - 1 to return to the main thread: -- -- -- -- -- -- -- -- -- the mainCopy the code

It’s amazing how it cuts out. Look at doSomething’s withContext(dispatchers.io), that’s where it cuts out. Suspend does not by itself provide any ability to switch threads; it also performs thread switching via withContext. And the key function of this is to tell the current thread of execution, I’m going to cut out, you’re going to leave me alone. At this point, the coroutine is suspended until the doSomething execution is complete, and the coroutine continues to execute the following code with a Runable. This is why suspended functions can only be called from within coroutines or suspended functions – without coroutines it does not have the ability to cut threads back.

As you can see here, suspend is primarily a reminder. The main work is withContext and coroutines. This keyword is more like a warning to the user that the function may block, so call it in a suspended function or coroutine.

At the same time, coroutines can bridge the blocking and non-blocking worlds, as shown in doSomething, a function that blocks. It blocks the execution of the block inside the coroutine so that the following code can’t execute, but the following Log print is in the main thread, so it can’t be blocked. That’s where the coroutine comes into play.

If you want to execute seemingly synchronous code in a coroutine asynchronously, you can use the async block:

Globalscope.launch (dispatchers.main) {async {doSomething()} log.e (" back to the Main thread ", "-- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name}")} Thread - after: -- -- -- -- -- -- -- -- -- the main back to the main Thread: -- -- -- -- -- -- -- -- -- the main time-consuming operation: ---------DefaultDispatcher-worker-1Copy the code

Because async is also an extension function that starts a coroutine:

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

Of course, since there are asynchronous operations, there are also synchronous operations:

Globalscope.launch (dispatchers.main) {val async = async {doSomething()} log.e (" return to Main thread 1", "-- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name}") async. Await () the e (" back to the main Thread 2 ", "-- -- -- -- -- -- -- -- - ${Thread. CurrentThread (). The name}")} out: Thread - after: -- -- -- -- -- -- -- -- -- the main back to the main Thread 1: -- -- -- -- -- -- -- -- -- the main time-consuming operation: -- -- -- -- -- -- -- -- -- DefaultDispatcher - worker - 1 to return to the main thread 2: -- -- -- -- -- -- -- -- -- the mainCopy the code

That’s the magic of coroutines, they let you decide whether you want to block or not.

So, hang up is one: leave me alone, you do your job, I’ll be back when I’m done. And that’s where non-blocking comes in. I’ll be on a mission somewhere else, and you’ll leave me alone and get on with what you’re doing. I’ll get back to you when I’m done. This is why coroutines can make blocking code coroutines non-blocking.

Scope of coroutines

We’re constantly using GlobalScope in the above code. What on earth is it? It is the individual coroutine scope. And what is the scope?

What is the scope

The scope of a coroutine defines the scope of the coroutine.

Coroutines can be managed by scope.

In general, scopes are used to describe a range. It provides some extra power, but it also acts as a constraint. Use the collaboration scope to clarify the relationships between the collaborations. At the same time, scopes allow you to manage and control the cancellation and exception handling of the collaboration.

Coroutines can be managed by scope.

In Android, there is first-class support for coroutine scope in all entities with a life cycle. See the official documentation for details: Use Kotlin coroutines with lifecycle aware components

The purpose of these scopes is to relate the life cycle of a coroutine to the components on which it depends. In this way, the status of coroutines can be adjusted timely to avoid memory leaks.

Kotlin’s definition of coroutine scope is simple:

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}
Copy the code

Defines an interface that holds a coroutine context variable to be implemented. The scope is used to save the coroutine context and pass it through the coroutine flow. It can also be used to determine the “scope” of the context, limiting the scope of the context.

You can even narrow it down to a context that makes it easier to call, such as the custom context above:

context[FirstContext]? .first() context[SecondContext]? .second() context[ThirdContext]? .third() context[FourthContext]? .fourth()Copy the code

You can think of the scope as the following code:

class Scop(coroutineContext: CoroutineContext){ fun first(){ context[FirstContext]? .first() } fun second(){ context[SecondContext]? .second() } fun third(){ context[ThirdContext]? .third() } fun fourth(){ context[FourthContext]? .fourth() } }Copy the code

So you don’t have to go through get again.

Kotlin provides us with many scopes, which can be grouped into the following categories:

A GlobalScope is a GlobalScope that can be used to start a top-level coroutine that lasts throughout the life of the App. GlobalScope is not recommended for use in applications. It is defined as follows:

public object GlobalScope : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
Copy the code

It holds an empty context instance.

LifecycleScope: lifecycleScope is defined for each Lifecycle object. Coroutines started within this scope will be cancelled when Lifecycle is destroyed. Its definition is also simple:

public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope get() = lifecycle.coroutineScope public val Lifecycle.coroutineScope: LifecycleCoroutineScope get() { while (true) { val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl? if (existing ! = null) { return existing } val newScope = LifecycleCoroutineScopeImpl( this, SupervisorJob() + Dispatchers.Main.immediate ) if (mInternalScopeRef.compareAndSet(null, newScope)) { newScope.register() return newScope } } }Copy the code

ViewModelScope: Defines a ViewModelScope for each ViewModel in the application. If the ViewModel is cleared, coroutines started in this range are automatically cancelled.

I won’t go into detail about their source code here. Those who are interested can check it out for themselves. If you read this article and understand the nature of coroutines, you should be able to easily understand scopes.

conclusion

Coroutines: Coroutines are not detached from threads; they also run in threads. It’s not a novelty, it’s a programming idea. It can be narrowly described as Kotlin’s solution to the concurrency problem by providing a set of wrapped apis that make concurrent tasks easier (using concurrency as if it were synchronous code).

Context: Gives me some additional capabilities, responsibility for setting the name of the coroutine, starting and canceling the coroutine, and which thread or thread the coroutine executes on. It is itself a linked list, using a header method, new elements are always at the head; ContinuationInterceptor has special treatment and is always at the head of the list. It is not allowed to have two contexts of the same type, and if a context of the same type is inserted, the old context instance is replaced with the new one. The thread scheduling of coroutines is implemented through the context scheduler, and ultimately through the Handler.

Scope: The scope of a coroutine defines the scope of a coroutine. It is used to hold and pass the coroutine context through the coroutine run flow. It can also be used to determine the “scope” of a context, limiting the scope of a context.