A simple coroutine example

This main method is directly treated as the program entry method, which does not carrysuspendModification.

RunBlocking to start a coroutine

RunBlocking is generally not used in development, but in testing frameworks, to connect the non-coroutine world to the coroutine world. Blocks of runBlocking are in the scope of coroutineScope. The main method above opens two coroutines, with runBlocking as the parent and launching as the child. The parent coroutine does not finish until all child coroutines have completed. A look at runBlocking source code:

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope. () - >T): T {
    .....
    val currentThread = Thread.currentThread()   // It must be the main thread
    val contextInterceptor = context[ContinuationInterceptor]  // Interceptor, null
    val eventLoop: EventLoop?        // Event polling handler
    val newContext: CoroutineContext
    if (contextInterceptor == null) {
        eventLoop = ThreadLocalEventLoop.eventLoop  
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {
        eventLoop = (contextInterceptor as? EventLoop)? .takeIf { it.shouldBeProcessedFromContext() } ? : ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) }val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) 
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)  //start=create+ resumeWith(Unit)
    return coroutine.joinBlocking()
}
Copy the code

RunBlocking blocks the main thread here when starting the coroutine. Start = create + resumeWith (Unit).,EventLoop refers to the handler process, which must have an event queue, insert the event, fetch the event, such as the event does not reach the processing time “block”.

Launch a child coroutine

In addition to launch, there are async+await ways to launch coroutines, where async returns a Deferred.

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope. () - >Unit): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)  // Start coroutine
    return coroutine
}
public interface Deferred<out T> : Job //Deferred is also a Job
Copy the code

Luanch returns a Job that starts the coroutine with (coroutine. Start), in this case as a child coroutine. Is it true that the child coroutine inherits the context of the parent coroutine? Really.

@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context     #1
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if(combined ! == Dispatchers.Default && combined[ContinuationInterceptor] ==null)
        debug + Dispatchers.Default else debug
}
Copy the code

The newCoroutineContext method is also scoped to the outermost coroutineScope, and then its context is coroutineContext in #1. Obviously, a subcoroutine can have its own context, and the two contexts can be combined, The dispatchers. Default is not added here, so the thread is still the main thread. CoroutineContext + Context contexts can be added using the overloaded plus operator.

//from CoroutineContext
public operator fun plus(context: CoroutineContext): CoroutineContext =
   ......
       }
Copy the code

CoroutineContext also supports CoroutineContext [Job], which uses the get convention and overloads the GET operator.

//from CoroutineContext
public operator fun <E : Element> get(key: Key<E>): E?
Copy the code

The key in the coroutineContext[Job] class is a static object with a final modification. Multiple objects in the coroutineContext class can share the key.

public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
Copy the code

Coroutine scheduler, coroutine interceptor, and coroutine interceptor are all coroutine contexts that implement coroutinecontext.Element.

3.1. Discover its ontology from the compiled product of the coroutine

Start two child coroutines in runBlocking, which share one thread. 👇

Use show Kotlin Bytecode for the transformed code 👇

public final class TestKt {
    public static final void main(a) {
        BuildersKt.runBlocking((CoroutineContext)(new CoroutineName("coroutine#1")), (Function2)(new Function2((Continuation)null) {
            int label;
            @Nullable
            public final Object invokeSuspend(@NotNull Object var1) {
                Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch(this.label) {
                    case 0:
                        ResultKt.throwOnFailure(var1);
                        System.out.println("1" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
                        ExecutorCoroutineDispatcher dispatcher = ThreadPoolDispatcherKt.newSingleThreadContext("my owner Thread ");
                        BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
                            int label;
                            @Nullable
                            public final Object invokeSuspend(@NotNull Object $result) {
                                Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                                switch(this.label) {
                                    case 0:
                                        ResultKt.throwOnFailure($result);
                                        System.out.println("2" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
                                        this.label = 1;
                                        if (DelayKt.delay(1000L.this) == var5) {
                                            return var5;
                                        }
                                        break;
                                    case 1:
                                        ResultKt.throwOnFailure($result);
                                        break;
                                    default:
                                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                                }
                                System.out.println("3" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
                                return Unit.INSTANCE;
                            }

                            @NotNull
                            public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                                Intrinsics.checkNotNullParameter(completion, "completion");
                                Function2 var3 = new <anonymous constructor>(completion);
                                var3.L$0 = value;
                                return var3;
                            }

                            public final Object invoke(Object var1, Object var2) {
                                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }}),2, (Object)null);
                        BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
                            int label;
                            @Nullable
                            public final Object invokeSuspend(@NotNull Object $result) {
                                Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                                switch(this.label) {
                                    case 0:
                                        ResultKt.throwOnFailure($result);
                                        System.out.println("21" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
                                        this.label = 1;
                                        if (DelayKt.delay(500L.this) == var5) {
                                            return var5;
                                        }
                                        break;
                                    case 1:
                                        ResultKt.throwOnFailure($result);
                                        break;
                                    default:
                                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                                }
                                System.out.println("31" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
                                return Unit.INSTANCE;
                            }

                            @NotNull
                            public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                                Intrinsics.checkNotNullParameter(completion, "completion");
                                Function2 var3 = new <anonymous constructor>(completion);
                                var3.L$0 = value;
                                return var3;
                            }

                            public final Object invoke(Object var1, Object var2) {
                                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }}),2, (Object)null);
                        System.out.println("4" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
                        return Unit.INSTANCE;
                    default:
                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); }}@NotNull
            public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
                Intrinsics.checkNotNullParameter(completion, "completion");
                Function2 var3 = new <anonymous constructor>(completion);
                var3.L$0 = value;
                return var3;
            }

            public final Object invoke(Object var1, Object var2) {
                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }})); }public static void main(String[] var0) { main(); }}Copy the code

The bytecodes of coroutines can also have callback nesting, so it’s only the parent coroutine nesting two child coroutines. If there are multiple coroutines nesting, there will also be callback hell. The essence of coroutine suspension is that the code of the coroutine body is divided into two parts by the suspension point. Here delay is the suspension function, and the code is divided into the start to the suspension point delay, delay after delay to end, the former code follows the switch’s case 0 logic, and the latter code is after the suspension point resumes, Let’s go back to Case 1 and the following code.

Pay attention to the function call, and not necessarily really hangs, specific to see if the return value IntrinsicsKt. GetCOROUTINE_SUSPENDED (); The next time you need to call invokeSuspend again, it will go label==1. Let’s take a single subcoroutine:

 BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
    int label;
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {#2This is to be able to find the Object var5 = IntrinsicsKt. GetCOROUTINE_SUSPENDED ();switch(this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                System.out.println("2" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
                this.label = 1;
                if (DelayKt.delay(1000L.this) == var5) {
                    return var5;
                }
                break;
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        System.out.println("3" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
        return Unit.INSTANCE;
    }

    @NotNull
    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Intrinsics.checkNotNullParameter(completion, "completion");
        Function2 var3 = new <anonymous constructor>(completion);
        var3.L$0 = value;
        return var3;
    }

    public final Object invoke(Object var1, Object var2) {#1The call here was not foundreturn ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }}),2, (Object)null);
Copy the code

How is Invoke () #1 triggered first? Did not find the corresponding call place, if any friends found, please inform me. How is calling invokeSuspend #2 triggered again? Is triggered at continuation.resumeWith(XXX).

It is well known that Kotlin’s suspend method is compiled with a Continuation

in place of the last parameter.

public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope. () - >Unit): Job 
Copy the code
BuildersKt.launch$default($this$runBlocking, 
(new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), 
(CoroutineStart)null, (Function2)(new Function2((Continuation)null) {}),2, (Object)null);
Copy the code

CoroutineScope’s launch extension function is passed in as the first argument when Java calls it, the second argument is the coroutine context, the third argument is the coroutine launch mode, and the next one is the anonymous class object in Function2. The receiver is a Continuation and the last two parameters are 2 and (Object)null.

When viewing the raw bytecode, you can see the following declaration:

final class com/docwei/kotlindemo/TestKt$main$1 $1extends kotlin/coroutines/jvm/internal/SuspendLambda// The coroutine body is compiledSuspendLambdaClass !!!!implements kotlin/jvm/functions/Function2 {             // also Fucntion2!!

INVOKESTATIC kotlinx/coroutines/BuildersKt.launch$default(Lkotlinx/coroutines/CoroutineScope; Lkotlin/coroutines/CoroutineContext; Lkotlinx/coroutines/CoroutineStart; Lkotlin/jvm/functions/Function2; ILjava/lang/Object;) Lkotlinx/coroutines/Job;Copy the code

SuspendLambda class can look at class inheritance

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block) / / # 1
    return coroutine
}
Copy the code

#1 looks like we’re creating a new coroutine and passing in a block is our coroutine body.

//from AbstractCoroutine
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R. () - >T) {
    start(block, receiver, this)}//from CoroutineStart
public operator fun <T> invoke(block: suspend() - >T, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(completion) //#2 will go here
        ATOMIC -> block.startCoroutine(completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(completion)
        LAZY -> Unit // will start lazily
    }
Copy the code

#2 Look at the declaration of block here, lambda with suspend, to start a coroutine that supports cancellation.

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {   // create + resumeWith = start
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()
    } catch (e: Throwable) {
        dispatcherFailure(completion, e)
    }
}
Copy the code

And as you follow along, you’ll see that this R is, in fact, the CoroutineScope, completion is the coroutine that we have in coroutine. Start, Said it is also a Continuation (coroutines is actually a Continuation, is also a CoroutineScope | coroutines is a SuspendLambda body, it is also a Continuation), AbstractCoroutine AbstractCoroutine class declaration

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
Copy the code

Block: suspend CoroutineScope.() -> Unit compiles to the SuspendLambda class

//from Intrinsicskt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)  // Create suspendLambda object
    else{... }}//from ContinuationImpl
public fun intercepted(a): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this)
            .also { intercepted = it }
            
Copy the code

Remember the coroutine start= create + resumewith(Unit)

@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) - >Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}
inline fun resumeCancellableWith(result: Result<T>,
    noinline onCancellation: ((cause: Throwable) - >Unit)? {
    val state = result.toState(onCancellation)
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this) //
    } else{... }}Copy the code

The scheduler we pass in is a custom scheduler, so when the coroutine is started it will be scheduled (in fact, it will switch the thread), and the coroutine body of the child coroutine will be executed in this thread.

//from ExecutorCoroutineDispatcherImpl
override fun dispatch(context: CoroutineContext, block: Runnable) {
    try {
        executor.execute(wrapTask(block)) 
    } catch (e: RejectedExecutionException) {
        unTrackTask()
        cancelJobOnRejection(context, e)
        Dispatchers.IO.dispatch(context, block)
    }
}
Copy the code

Thread pools perform tasks, which must depend on the run method.

//from DispatchedTask
public final override fun run(a) {
    try{... continuation.resume(getSuccessfulResult(state)) } }catch (e: Throwable) {
      ......
    } finally{... }}//from BaseContinuationImpl
public final override fun resumeWith(result: Result<Any? >){...val outcome = invokeSuspend(param)  //invokeSuspend is finally called
        if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome) ....... }}Copy the code

So the first call to invokeSuspend of Function2 is triggered by resumeWith(Unit) when the coroutine starts.

3.2. Custom SingleThread pool newSingleThreadContext(XXX)

val dispatcher = newSingleThreadContext("my owner Thread ")

public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
    newFixedThreadPoolContext(1, name)
    
public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
    require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
    val threadNo = AtomicInteger()
    val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
        val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
        t.isDaemon = true  // Daemon thread
        t
    }
    return executor.asCoroutineDispatcher()
}   
Copy the code

The thread created by newSingleThreadContext is a daemon thread, in case the coroutine fails to cancel properly and the app quits and terminates execution. And is the key point is the Executors. NewScheduledThreadPool thread pool, generated by this way is easy for you timing tasks, such as delay the operation.

3.3 Scheduling threads of coroutines

The main thread of the launch is the default schedulerTo start a schedule, you simply wrap the coroutine body into a runnable and throw it to the thread pool to execute. If the current thread pool supports timed execution, the current thread is used. If the current thread pool does not support timed execution, the current thread uses a default DefaultExecutor, which is not ScheduledExecutorService. It is implemented through locksupport. parkNanos(Blocker, Nanos).

Let’s take a look at DefaultExecutor’s daemon run method 👇 :

// The Thread is also a daemon Thread.
override fun run(a) {
    ThreadLocalEventLoop.setEventLoop(this)
    registerTimeLoopThread()
    try {
        var shutdownNanos = Long.MAX_VALUE
        if(! notifyStartup())return
        while (true) {   / / death cycle
            Thread.interrupted() // just reset interruption flag 
            var parkNanos = processNextEvent()  // Remove the next event
            if (parkNanos == Long.MAX_VALUE) {
                // nothing to do, initialize shutdown timeout
                val now = nanoTime()
                if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
                val tillShutdown = shutdownNanos - now
                if (tillShutdown <= 0) return // shut thread down
                parkNanos = parkNanos.coerceAtMost(tillShutdown)
            } else
                shutdownNanos = Long.MAX_VALUE
            if (parkNanos > 0) {
                if (isShutdownRequested) return
                parkNanos(this, parkNanos)    // If the processing time is not reached, block the daemon thread.}}}finally {
        _thread = null // this thread is dead
        acknowledgeShutdownIfNeeded()
        unregisterTimeLoopThread()
        if(! isEmpty) thread// recreate thread if it is needed}}@InlineOnly
internal inline fun parkNanos(blocker: Any, nanos: Long){ timeSource? .parkNanos(blocker, nanos) ? : LockSupport.parkNanos(blocker, nanos) }Copy the code

Handler looper. loop, blocking via epoll_wait.

public static void loop(a) {
        for (;;) {
             Message msg = queue.next(); // might block. msg.target.dispatchMessage(msg); . }}Message next(a) {
        int nextPollTimeoutMillis = 0;
        for (;;) {
            nativePollOnce(ptr, nextPollTimeoutMillis); // The bottom layer finally calls epoll_wait
            synchronized (this) {...if(msg ! =null) {
                    if (now < msg.when) {
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else{... }}else {
                    nextPollTimeoutMillis = -1; }... nextPollTimeoutMillis =0; }}Copy the code

Third, delay (time)

If you think of a coroutine as a thread, delay(1000) is the thread’s sleep(1000). Sleep blocks the thread, but does the delay inside the coroutine block the current thread? Don’t. Continuing with the above example, let them schedule on the same thread, and then both delay. You’ll notice that it’s not blocking the thread.

Delay does not block the thread of execution of the current coroutine, so it is called lightweight. If delay does not block the current thread, how does it know to resume at the specified time? Is it possible to use a similar android handler postdelay mechanism, of course not.

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}
public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T- > >)Unit
): T =suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        cancellable.initCancellability() 
        block(cancellable)
        cancellable.getResult()
    }
internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
Copy the code

Continuation encapsulated into support to cancel CancellableContinuation cancellable. InitCancellability associated coroutines, father and son () here is encapsulated into support to cancel coroutines, go into the state machine process?

//from ExecutorCoroutineDispatcherImpl
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val future = (executor as? ScheduledExecutorService)? .scheduleBlock( ResumeUndispatchedRunnable(this, continuation),
        continuation.context,
        timeMillis
    )
    if(future ! =null) {  // If the current thread pool is ScheduledExecutorService, it is handled directly in the current thread pool
        continuation.cancelFutureOnCancellation(future)
        return
    }// Just use the default DefaultExecutor
    DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
}
private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
    return try {
        schedule(block, timeMillis, TimeUnit.MILLISECONDS)
    } catch (e: RejectedExecutionException) {
        cancelJobOnRejection(context, e)
        null}}//ScheduledExecutorService
publicScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit);Copy the code

The delay is implemented by the Scheule method of ScheduledExecutorService and will not block the current thread. The thread pool of the thread scheduled by the subcoroutine is a thread pool that supports the scheduled execution of tasks. This is distinguished from LockSupport which goes with DefaultExecutor by default. A runnable run method is the dispatcher. ResumeUndispatched (Unit).

private class ResumeUndispatchedRunnable(
    private val dispatcher: CoroutineDispatcher,
    private val continuation: CancellableContinuation<Unit>
) : Runnable {
    override fun run(a) {
       // This is a Unit
        with(continuation) { dispatcher.resumeUndispatched(Unit)}}}override fun CoroutineDispatcher.resumeUndispatched(value: T) {
    val dc = delegate as? DispatchedContinuation
    resumeImpl(value, if(dc? .dispatcher ===this) MODE_UNDISPATCHED else resumeMode)
}
// The state machine of coroutines handles state reversals
private fun resumeImpl(
    proposedUpdate: Any? , resumeMode:Int,
    onCancellation: ((cause: Throwable) - >Unit)? = null
) {
    _state.loop { state ->
        when (state) {
            is NotCompleted -> {
                val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
                if(! _state.compareAndSet(state, update))return@loop // retry on cas failure
                detachChildIfNonResuable()
                dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
                return // done
            }
            is CancelledContinuation -> {
           ......
}
Copy the code

The state machine of the coroutine starts processing the state flow, skipping the state machine.

private fun dispatchResume(mode: Int) {
    if (tryResume()) return // completed before getResult invocation -- bail out
    dispatch(mode)
}
Copy the code

private fun dispatchResume(mode: Int) {
    if (tryResume()) return // completed before getResult invocation -- bail out
    dispatch(mode)
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int){... }else {
        resume(delegate, undispatched)
    }
}
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
    // This resume is never cancellable. The result is always delivered to delegate continuation.
    val state = takeState()
    val exception = getExceptionalResult(state)
    val result = if(exception ! =null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
    when {
        undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
        else -> delegate.resumeWith(result)
    }
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWith(result: Result<T>) {
    withContinuationContext(continuation, countOrElement) {
        continuation.resumeWith(result)
    }
}
Copy the code

When you see continuation.resumeWith(result), you must call invokeSuspend again and the code continues.

//from BaseContinuationImpl
public final override fun resumeWith(result: Result<Any? >){...val outcome = invokeSuspend(param)  //invokeSuspend is finally called
        if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome) ....... }}Copy the code

Now you can see that if there is a delay in a coroutine, resumeWith(Unit) will be called twice, once to start the coroutine and once to resume the delay. Two calls to resumeWith(Unit) correspond to two calls to the invokeSuspend method in Java bytecode. Bennyhuo says: If there are n hanging points in a coroutine, then a total of N +2 calls to resume (2 is called once at start and once at end) will also be called to resumeWith at the end. It doesn’t look like it’s going to. ResumeWith starts calling once, and when it’s done calling again, invokeSuspend is going to be called 3 times, and the logic for the case that label equals 1 is going to go through multiple times, which should print multiple times, but it’s not.

Four, subsequent

The current state machine is not touched, and so is the event handling mechanism of coroutines. They can be analyzed separately.