Technology more than, the article has material, concern public number nine heart said, a high quality good article every week, and nine hearts in Dachang road side by side.

preface

This article follows the previous article, this time we have a good talk about the principle of coroutines, through the study of the last article, I believe that you are very familiar with how to use coroutines.

The story starts with the last coroutine share, and since we don’t have much practice with coroutines, there’s a lot of debate about how to execute the following code:

GlobalScope.launch {
    val a = async {
        1+2
    }

    val b = async {
        1+3
    }

    val c = a + b
    Log.e(TAG,"result:$c")}Copy the code

Some people say that a and B will execute sequentially, some people say that A and B will execute in parallel, so what is the result of the execution? We will give them in the following article.

This article is the second in a three-part series:

Kotlin-coroutines as You Learn, Kotlin-Coroutines As You Learn, Kotlin-Coroutines as You Learn, Kotlin-Coroutines as You Learn

A brief introduction to the structure

First of all, we have to figure out what’s in coroutines, and if you use coroutines, you know that coroutines have CoroutineScope, CoroutineContext, and CoroutineDispatcher, and those are all apis that you can access while you’re using them.

I briefly organized the main base classes of coroutines:

The class structure of coroutines can be broken down into three parts: CoroutineScope, CoroutineContext, and Continuation.

1. Continuation

If you use coroutines, you know that you can suspend your coroutine from a time-consuming suspend operation, and when the task ends, it automatically cuts back.

The secret is continuations, which understand a program, which you can understand each time it wraps up the rest of the code at the start of a coroutine hang and executes the rest of the code after it finishes. A block of code for a coroutine may be cut into continuations, with one assigned to each place where it needs to hang.

A few conclusions. If you suspend a time-consuming operation on a coroutine, it will be suspended automatically, but the time-consuming operation must be done anyway. You just switch to another thread, and then you have to cut back. This is where continuations come in.

The flow of a Continuation looks like this:

Coroutines, whether launched with launch or Async, have a continuation that ends with a callback.

2. CoroutineScope

There’s not much to say about CoroutineScope, which holds CoroutineContext, which manages the life cycle of coroutines.

3. CoroutineContext

At first, I felt dizzy when I saw CoroutineContext, I didn’t understand why it was designed like this, but it got better after I read the articles of Bennyhuo.

As you can see from the structure of the coroutine class above, the CoroutineContext interface looks a bit like a List, and the Element interface, which inherits the CoroutineContext interface, defines its elements.

The Element interface is then divided into two categories, Job and ContinuationInterceptor:

  • Job: Literally, it stands for a task,ThreadIt also performs tasks, so we can understand that it defines some things about coroutines, like the state of coroutines, how coroutines and subcoroutines are managed, and so on.
  • ContinuationInterceptor: Also literally, it isContinuationThe interceptor passes interceptionContinuationDo what we want to do, such as thread switching.

Two, structure source code analysis

Above we introduced the concept of coroutines three major pieces, in this part, we analyze from source code.

1. Continuation

Methods decorated by suspend are treated by the compiler during compilation as CPS(Continuation style) conversions, and the suspend method is wrapped as a Continuation.

After all this talk of continuations, we haven’t seen the interface code yet, and since there isn’t much of it, I’ve posted all of it:

/** * Interface representing a continuation after a suspension point that returns a value of type `T`. */
@SinceKotlin("1.3")
public interface Continuation<in T> {
    /** * The context of the coroutine that corresponds to this continuation. */
    public val context: CoroutineContext

    /** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value  of the last suspension point. */
    public fun resumeWith(result: Result<T>)
}
Copy the code

We focus on the Continuation#resumeWith() method, which, from the comments, resumes the execution of the coroutine by returning the value at the start of the suspend suspension. The coroutine can obtain either a successful value or a failed Result from the parameter Result

. If there is no Result, So the generic of Result

is Unit. Resulut this class is also very simple, interested students can view the source code.

BaseContinuationImpl implements the Continuation interface. Let’s look at the Continuation#resumeWith method:

internal abstract class BaseContinuationImpl(
    // Call Continuation after completion
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    
    public final override fun resumeWith(result: Result<Any? >) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                valoutcome: Result<Any? > =try {
                        Execute the code block in suspend
                        val outcome = invokeSuspend(param)
                        // 2. Return early if the code hangs
                        if (outcome === COROUTINE_SUSPENDED) return
                        // 3. Return the result
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // 3. An error message is displayed
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // 4. Recursion if there are sub-completions in completion
                    current = completion
                    param = outcome
                } else {
                    // 5. Notification of results
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}
Copy the code

The main process has been laid out in my comments, so let me explain the mechanics of continuations.

The constructor of each BaseContinuationImpl generated by the suspend method has a parameter called completion, which is also a Continuation and is called when the suspen method is completed. We’ll call it later

For simplicity’s sake, let’s just look at the launch process 3, 4, and 5. Usually a launch generates an outer Continuation and a corresponding resulting Continuation. We will call the resulting continuation complete. Continuation is called in the following order:

  1. Call the outerContinuationIn theContinuation#resumeWith()Methods.
  2. The method will executelaunchWrap the code block and return a result.
  3. Hand in the results of the above code block executioncompletionWhich completes notification of coroutine termination.

The above process only exists in a launch and no other time-consuming suspend operations are performed, which we will discuss in the following article.

As you can see from comment 2, after a time-consuming suspend operation, the result returned is a COROUTINE_SUSPENDED, which will be directly returned later. How will our completion be restored when the time-consuming operation is over?

2. CoroutineContext and Element

In profiling, we say that CoroutineContext is structured like a collection and draws conclusions from its interfaces:

public interface CoroutineContext {
    // Get method, obtained by key
    public operator fun <E : Element> get(key: Key<E>): E?
    // add operation
    public fun <R> fold(initial: R, operation: (R.Element) - >R): R
    // Operator +, the actual implementation calls the fold method
    public operator fun plus(context: CoroutineContext): CoroutineContext
    // Remove operation
    public fun minusKey(key: Key< * >): CoroutineContext
    
    // CoroutineContext Specifies the Key
    public interface Key<E : Element>

    // CoroutineContext defines the element
    public interface Element : CoroutineContext {
        // key
        public val key: Key<*>
        / /...}}Copy the code

The Element Element can be obtained by Key in CoroutineContext, and the Element interface is inherited from the CoroutineContext interface.

In addition, CoroutineContext supports add and remove operations, and supports the + operator to accomplish additions. The + operator, also known as the Plus method, has a specific implementation, which can be interesting to see for yourself, mainly involving the addition of the ContinuationInterceptor.

1.1 the Job

The Job comment states the definition as follows:

A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.

It can be concluded that:

  1. Background tasks
  2. Can be cancelled
  3. The life cycle ends when it completes

A Job has a state that is similar to a Thread. A Job has a state that is similar to a Thread. A Job has a state that is similar to a Thread.

Another important aspect of Job management is the management of sub-jobs. The main rules are as follows:

  1. The childJobWhen it’s all over, fatherJobWill not end
  2. The fatherJobAt the time of cancellation, sonJobWill be cancelled

Some of the above information can be obtained from the Job interface documentation. So, where does Job come from? If you look at the coroutinescop #launch method, you’ll conclude that the return type of this method is Job, and we create a Job each time we call this method.

1.2 ContinuationInterceptor

Continuation interceptor, as the name suggests, looks at the interface first:

interface ContinuationInterceptor : CoroutineContext.Element {
    ContinuationInterceptor's Key in CoroutineContext
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    /**
     * 拦截 continuation
     */
    fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>

    / /...
}
Copy the code

This interface can extract just two pieces of information:

  1. The interceptorKeyThat is, whatever is behind youCoroutineContextHow many interceptors are there,KeyContinuationInterceptorCan only have one interceptor.
  2. As we all know,ContinuationIn the call itContinuation#resumeWith()Method is executedsuspendThe code block of the modified function, if we intercept it in advance, can we do something else, like switch threads, which is also hereContinuationInterceptorOne of the functions of.

It should be noted that we specify the threads on which coroutines occur through Dispatchers, which implement the ContinuationInterceptor interface.

3. CoroutineScope

The CoroutineScope interface is simple:

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}
Copy the code

It requires that subsequent implementations provide CoroutineContext, but as we all know, CoroutineContext is an important part of coroutines, including both jobs and schedulers.

In the above code I have used several times the extension library for Coroutines in Android Jetpack which makes it easier to get CoroutineScope without having to manually cancel the component onDestroy and the source code is super simple. If you will use Lifecycle:

internal class LifecycleCoroutineScopeImpl(
    override val lifecycle: Lifecycle,
    override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
    // ...

    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
            lifecycle.removeObserver(this)
            coroutineContext.cancel()
        }
    }
}
Copy the code

It also allows you to call coroutines during specified life cycles, as you can see from the interface.

Process source code analysis

Let’s start with the following code:

lifecycleScope.launch(Dispatchers.Main) {
    val a = async { getResult(1.2)}val b = async { getResult(3.5)}val c = a.await() + b.await()
    Log.e(TAG, "result:$c")}suspend fun getResult(a: Int, b: Int): Int {
    return withContext(Dispatchers.IO) {
        delay(1000)
        return@withContext a + b
    }
}
Copy the code

Although the code is very simple, but the source code is more complex, let’s talk about step by step.

The first step is to get the CoroutineScope

As I have stated above, we use the Coroutine extension library for Lifecycle, if we don’t use the extension library we use MainScope, and their CoroutineContext is the same:

public fun MainScope(a): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            // ...
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            // ...
            return newScope
        }
    }
Copy the code

It’s notable that MainScope and LifecycleCoroutineScope both use the container container for the CoroutineContext, which is the + dispatchers.main container.

It’s important to note that the SupervisorJob and dispatchers. Main are designed to represent the Job and ContinuationInterceptor mentioned in CoroutineContext, respectively.

The second step starts the coroutine

Go directly to the CoroutineScope#launch() method:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

The above method has three parameters, the first two will not be described too much, the third parameter:

block: suspend CoroutineScope.() -> Unit)
Copy the code

This is a method that is a lambda argument and also indicates that it needs to be decorated by suspend. Moving on to the launch method, it does two main things:

  1. Combination of newCoroutineContext
  2. Create another oneContinuation

Combine the new CoroutineContext

In the first line of code val newContext = newCoroutineContext(context) does the first thing, where newCoroutineContext(context) is an extension method:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if(combined ! == Dispatchers.Default && combined[ContinuationInterceptor] ==null)
        debug + Dispatchers.Default else debug
}
Copy the code

The CoroutineScope uses its own collection of coroutineContext, using the + operator to add the coroutineContext we provided in the launch method.

Create a Continuation

Going back to the previous code, normally we don’t specify the start argument, so it will use the DEFAULT coroutinestart.default, and the coroutine will end up with a StandaloneCoroutine.

The StandaloneCoroutine implementation is AbstractCoroutine, and if you look at the class diagram above, you’ll see that it implements a bunch of interfaces like Continuations, Jobs, and CoroutineScope. It should be noted that this StandaloneCoroutine is actually the complete part of our current Suspend Contination.

And then it calls

coroutine.start(start, coroutine, block)
Copy the code

This indicates that the coroutine has started.

The third step start

Enter the AbstractCoroutine#start method:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R. () - >T) {
    initParentJob()
    start(block, receiver, this)}Copy the code

Skip through layers of nesting and finally arrive at:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // Wrap it with Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // Do interception if necessary
            .intercepted()
            // Call resumeWith
            .resumeCancellableWith(Result.success(Unit))}Copy the code

Although this is just a function, the main logic behind it is revealed:

  1. Create one that has not been interceptedContinuation.
  2. interceptContinuation.
  3. performContinuation#resumeWithMethods.

Step 4 create againContinuation

I’m using both here because we already created AbstractContinuaion in launch, but it’s a complete, as can be seen from the line arguments of each function.

SuspendLambda, which extends from ContinuationImpl. If you ask me why I can’t find an implementation in the source code, I think it might be related to the suspend modifier, Handled by the compiler, but the call stack does look like this:

Look at the implementation of the SuspendLambda class:

internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)
    / /...
}
Copy the code

As you can see, its constructor takes a complete parameter.

Step 5 Intercept processing

Back to:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // Wrap it with Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // Do interception if necessary
            .intercepted()
            // Call resumeWith
            .resumeCancellableWith(Result.success(Unit))}Copy the code

The interception method in Continuation#intercepted() is an extension:

@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(a): Continuation<T> =
    (this as? ContinuationImpl)? .intercepted() ?:this
Copy the code

CreateCoroutineUnintercepted (receiver, completion) returns a SuspendLambda, so it must be a ContinuationImpl and have a look at it intercept method implementation:

internal abstract class ContinuationImpl( completion: Continuation<Any? >? .private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion? .context)public override val context: CoroutineContext
        get() = _context!!

    public fun intercepted(a): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this)
                .also { intercepted = it }
    // ...
}
Copy the code

In the ContinuationImpl#intercepted() method, the data structure context is used directly to get the interceptor through context[ContinuationInterceptor].

CoroutineDispatcher interception implementation

ContinuationInterceptor intercepts CoroutineDispatcher class ContinuationInterceptor intercepts CoroutineDispatcher class ContinuationInterceptor interceptor intercepts CoroutineDispatcher class ContinuationInterceptor interceptor interceptor CoroutineDispatcher class ContinuationInterceptor interceptor CoroutineDispatcher class

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    / /...
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    // 1. The intercepted Continuation is wrapped with a DispatchedContinuation layer
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    / /...
}

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
    // ...
    override fun resumeWith(result: Result<T>) {
        // ...
        if (dispatcher.isDispatchNeeded(context)) {
            // 2. The last parameter needs to provide Runnable. The parent class is already implemented
            dispatcher.dispatch(context, this)}/ /...
    }
    // ...
}

// SchedulerTask is a Runnable
internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
    // ...
    public final override fun run(a) {
        // ...
        try {
            / /...
            withCoroutineContext(context, delegate.countOrElement) {
                // 3. Continuation is a continuation of DispatchedContinuationcontinuation.resume(...) }}/ /...}}Copy the code

To put it simply, you add a layer of interception to the resumeWith operation of the existing Continuation, like this:

After adding CoroutineDispatcher, the CoroutineDispatcher#dispatch() method is executed before the real continuity #resumeWith(), So let’s focus on the coroutine Dispatch #dispatch implementation.

Talk about a concrete implementation of CoroutineDispatcher

First of all we have to figure out where does the CoroutineDispatcher come from? It gets from context, where does context come from?

Note the constructors of SuspendLambda and ContinuationImpl. The arguments in SuspendLambda do not have CoroutineContext, so they can only come from CoroutineContext in Completion, The CoroutineContext of completion comes from the launch method, which is the CoroutineScope, and the default is SupervisorJob() + dispatchers.main. But only dispatchers. Main inherited from CoroutineDispatcher.

Dispatchers.Main is a MainCoroutineDispatcher, and the corresponding MainCoroutineDispatcher is a HandlerContext:

internal class HandlerContext private constructor(
    private val handler: Handler,
    private valname: String? .private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    / /...

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // Use the main thread Handler to execute tasks
        handler.post(block)
    }

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        // Use the Handler of the main thread to delay execution of the task, placing the completed continuation in the task
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    / /..
}
Copy the code

The main thread is going to be assigned to the main thread Handler.

For those of you who are curious, what if it’s not the main thread? Thread pools that are not used by the main thread:

public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    / / time frames
    override val executor: Executor
        get() = coroutineScheduler
    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatch(context, block)
        }
}
Copy the code

CoroutineScheduler is a thread pool. If you want to know the specific process, students can look at the code.

Reading this, you may have a little idea why CoroutineContext is designed as a data structure:

  1. coroutineContext[ContinuationInterceptor]The interceptor of the current coroutine can be fetched directly, and only one scheduler can be used for each coroutine.
  2. The scheduler is in the othercoroutineContext“, so you can do interception while executing coroutines.

Similarly, we can use coroutineContext[Job] to get the current coroutine.

Step 6 resumeWith

Back to:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // Wrap it with Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // Do interception if necessary
            .intercepted()
            // Call resumeWith
            .resumeCancellableWith(Result.success(Unit))}Copy the code

Continue#resumeCancellableWith() is an extension with the scheduling logic:

  1. DispatchContinuation#resumeCancellableWith
  2. CoroutineDispatcher#dispatch
  3. Continuation#resumeWith

The Continuation here is SuspendLambda, which inherits the BaseContinuationImpl. Let’s see how it works:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any? >) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                valoutcome: Result<Any? > =try {
                        1. Execute the code block from suspend
                        val outcome = invokeSuspend(param)
                        // 2. If a block of code executes a suspend method, it returns early
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // 3. If completion is also BaseContinuationImpl, the loop will be entered
                    current = completion
                    param = outcome
                } else {
                    // 4. Run the completion resumeWith method
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}
Copy the code

This side is divided into two parts:

  • performsuspendMethod and get the results
  • callcomplete(In the next step)

Executing the suspend method

The suspend method content is executed in the first place, and the suspend method may be scheduled inside the method, as in our instance method:

lifecycleScope.launch(Dispatchers.Main) {
    val a = async { getResult(1.2)}val b = async { getResult(3.5)}val c = a.await() + b.await()
    Log.e(TAG, "result:$c")}suspend fun getResult(a: Int, b: Int): Int {
    return withContext(Dispatchers.IO) {
        delay(1000)
        return@withContext a + b
    }
}
Copy the code

Since we performed a delay operation on getResult, our launch method must have performed a time-dependent suspend method, so basecontinuationIMPLI #invokeSuspend will return a COROUTINE_SUSPENDED, As a result, as you can see, the method ends prematurely. (For clarification, I can’t find an implementation of the BaseContinuationImpl#invokeSuspend method. I guess it’s the compiler.)

I’m sure you’re just as curious as I am about how a time-consuming suspend can be used to restore complete.

Let’s take a look at delay(1000) on the main thread:

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}


internal class HandlerContext private constructor(
    private val handler: Handler,
    private valname: String? .private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    / /...

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    / /...
}
Copy the code

As you can see, package the restore task with a Runnable and hand it off to Handler’s Handler#postDelayed() method.

Step 7 complete resumeWith

There are generally two ways to handle a complete.

Complete is BaseContinuationImpl

The first case is what we call a nesting doll, a completed callback Continuation that has its own completed callback Continuation, and then the loop is right.

Call complete’s resumeWith

In the second case, the callback is completed with complete. Since complete is AbstractContinuation, let’s look at its resumeWith:

public abstract class AbstractCoroutine<in T>(
    /** * The context of the parent coroutine. */
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    // ...
    public final override fun resumeWith(result: Result<T>) {
        // 1. Obtain the technical status of the current coroutine
        val state = makeCompletingOnce(result.toState())
        // 2. If you are currently waiting for completion, there are subcoroutines that have not finished
        if (state === COMPLETING_WAITING_CHILDREN) return
        // 3. Execute the end restoration method. The default value is null
        afterResume(state)
    }

    // This is the makeCompletingOnce method in the parent JobSupport class
    // I'll copy it for you
    internal fun makeCompletingOnce(proposedUpdate: Any?).: Any? {
        loopOnState { state ->
            // tryMakeCompleting is handled differently depending on whether there are child jobs
            val finalState = tryMakeCompleting(state, proposedUpdate)
            when {
                finalState === COMPLETING_ALREADY ->
                    throw IllegalStateException(
                        "Job $this is already complete or completing, " +
                                "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
                    )
                finalState === COMPLETING_RETRY -> return@loopOnState
                else -> return finalState // COMPLETING_WAITING_CHILDREN or final state}}}}Copy the code

The coroutine is about to complete, so we need to evaluate the technical status of the coroutine before it runs. Some coroutines with subcoroutines will wait for the subcoroutine to end before terminating the current coroutine.

This is the process of a launch. The rough flow chart looks like this:

Let’s talk more about async.

Iv. About Async

Async and launch codes are very similar:

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

Finally, there are three steps:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // Wrap it with Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // Do interception if necessary
            .intercepted()
            // Call resumeWith
            .resumeCancellableWith(Result.success(Unit))}Copy the code

Instead of async returning a Deferred

, we need to call Deferred#await() to get the result, which is implemented in JobSupport:

private open class DeferredCoroutine<T>(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
    / /... The awaitInternal method comes from the parent class JobSupport
    override suspend fun await(a): T = awaitInternal() as T
    // ...

    // This is implemented in JobSupport
    internal suspend fun awaitInternal(a): Any? {
        // loop the result
        while (true) { // lock-free loop on state
            val state = this.state
            // 1
            if (state !is Incomplete) {
                if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                    recoverAndThrow(state.cause)
                }
                return state.unboxState()
            }
            // 2. Break unless you need to retry
            if (startInternal(state) >= 0) break 
        }
        // Wait for the hang method
        return awaitSuspend() // slow-path}}Copy the code

Its specific process can be seen from my notes, not an introduction, interested students can view the source code.

1. Discussion at the beginning of this article

The initial code of this article is wrong, even the compiler can not pass, embarrassing ~

The correct code should be:

GlobalScope.launch {
    val a = async {
        1+2
    }

    val b = async {
        1+3
    }

    val c = a.await() + bawait()
    Log.e(TAG,"result:$c")}Copy the code

If the code is correct, there are two possible cases:

If you put it on the UI thread, it’s going to be serial, and somebody says, well, I used delay(1000) in A, and DELAY (2000) in B, and it took me 2000 milliseconds to get TO C. Isn’t that parallel? This is not the case. The delay operation uses the Handler#postDelay method. One is 1000 milliseconds late and the other is 2000 milliseconds late, but there is only one main thread, so it can only be serial.

If it’s a child thread, it’s usually parallel because we’re using a thread pool

conclusion

When writing this source code analysis, some details can not always be found, such as suspendLambda subclass can not be found, their study of Kotlin needs to be in-depth.

Therefore, some parts of this article are still worth discussing. If you have a better understanding, welcome to communicate with us below.

Some time ago, I had a slight illness, and I went to the hospital every week, so I was very sad, and the efficiency of writing the blog was also very low. I wrote this article for nearly a month, ah, it was very difficult ~, but I returned to normal from this week.

Article Reference:

Cracking Kotlin Coroutines (3) – Coroutine Scheduling