A simple coroutine example
This main method is directly treated as the program entry method, which does not carrysuspend
Modification.
RunBlocking to start a coroutine
RunBlocking is generally not used in development, but in testing frameworks, to connect the non-coroutine world to the coroutine world. Blocks of runBlocking are in the scope of coroutineScope. The main method above opens two coroutines, with runBlocking as the parent and launching as the child. The parent coroutine does not finish until all child coroutines have completed. A look at runBlocking source code:
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope. () - >T): T {
.....
val currentThread = Thread.currentThread() // It must be the main thread
val contextInterceptor = context[ContinuationInterceptor] // Interceptor, null
val eventLoop: EventLoop? // Event polling handler
val newContext: CoroutineContext
if (contextInterceptor == null) {
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
eventLoop = (contextInterceptor as? EventLoop)? .takeIf { it.shouldBeProcessedFromContext() } ? : ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) }val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block) //start=create+ resumeWith(Unit)
return coroutine.joinBlocking()
}
Copy the code
RunBlocking blocks the main thread here when starting the coroutine. Start = create + resumeWith (Unit).,EventLoop refers to the handler process, which must have an event queue, insert the event, fetch the event, such as the event does not reach the processing time “block”.
Launch a child coroutine
In addition to launch, there are async+await ways to launch coroutines, where async returns a Deferred.
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope. () - >Unit): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block) // Start coroutine
return coroutine
}
public interface Deferred<out T> : Job //Deferred is also a Job
Copy the code
Luanch returns a Job that starts the coroutine with (coroutine. Start), in this case as a child coroutine. Is it true that the child coroutine inherits the context of the parent coroutine? Really.
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context #1
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if(combined ! == Dispatchers.Default && combined[ContinuationInterceptor] ==null)
debug + Dispatchers.Default else debug
}
Copy the code
The newCoroutineContext method is also scoped to the outermost coroutineScope, and then its context is coroutineContext in #1. Obviously, a subcoroutine can have its own context, and the two contexts can be combined, The dispatchers. Default is not added here, so the thread is still the main thread. CoroutineContext + Context contexts can be added using the overloaded plus operator.
//from CoroutineContext
public operator fun plus(context: CoroutineContext): CoroutineContext =
......
}
Copy the code
CoroutineContext also supports CoroutineContext [Job], which uses the get convention and overloads the GET operator.
//from CoroutineContext
public operator fun <E : Element> get(key: Key<E>): E?
Copy the code
The key in the coroutineContext[Job] class is a static object with a final modification. Multiple objects in the coroutineContext class can share the key.
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
Copy the code
Coroutine scheduler, coroutine interceptor, and coroutine interceptor are all coroutine contexts that implement coroutinecontext.Element.
3.1. Discover its ontology from the compiled product of the coroutine
Start two child coroutines in runBlocking, which share one thread. 👇
Use show Kotlin Bytecode for the transformed code 👇
public final class TestKt {
public static final void main(a) {
BuildersKt.runBlocking((CoroutineContext)(new CoroutineName("coroutine#1")), (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
System.out.println("1" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
ExecutorCoroutineDispatcher dispatcher = ThreadPoolDispatcherKt.newSingleThreadContext("my owner Thread ");
BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
System.out.println("2" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
this.label = 1;
if (DelayKt.delay(1000L.this) == var5) {
return var5;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
System.out.println("3" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.L$0 = value;
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }}),2, (Object)null);
BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
System.out.println("21" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
this.label = 1;
if (DelayKt.delay(500L.this) == var5) {
return var5;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
System.out.println("31" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.L$0 = value;
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }}),2, (Object)null);
System.out.println("4" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); }}@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.L$0 = value;
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }})); }public static void main(String[] var0) { main(); }}Copy the code
The bytecodes of coroutines can also have callback nesting, so it’s only the parent coroutine nesting two child coroutines. If there are multiple coroutines nesting, there will also be callback hell. The essence of coroutine suspension is that the code of the coroutine body is divided into two parts by the suspension point. Here delay is the suspension function, and the code is divided into the start to the suspension point delay, delay after delay to end, the former code follows the switch’s case 0 logic, and the latter code is after the suspension point resumes, Let’s go back to Case 1 and the following code.
Pay attention to the function call, and not necessarily really hangs, specific to see if the return value IntrinsicsKt. GetCOROUTINE_SUSPENDED (); The next time you need to call invokeSuspend again, it will go label==1. Let’s take a single subcoroutine:
BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {#2This is to be able to find the Object var5 = IntrinsicsKt. GetCOROUTINE_SUSPENDED ();switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
System.out.println("2" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
this.label = 1;
if (DelayKt.delay(1000L.this) == var5) {
return var5;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
System.out.println("3" + Thread.currentThread().name + "-" + coroutineContext[CoroutineName])
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.L$0 = value;
return var3;
}
public final Object invoke(Object var1, Object var2) {#1The call here was not foundreturn ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }}),2, (Object)null);
Copy the code
How is Invoke () #1 triggered first? Did not find the corresponding call place, if any friends found, please inform me. How is calling invokeSuspend #2 triggered again? Is triggered at continuation.resumeWith(XXX).
It is well known that Kotlin’s suspend method is compiled with a Continuation
in place of the last parameter.
public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope. () - >Unit): Job
Copy the code
BuildersKt.launch$default($this$runBlocking,
(new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher),
(CoroutineStart)null, (Function2)(new Function2((Continuation)null) {}),2, (Object)null);
Copy the code
CoroutineScope’s launch extension function is passed in as the first argument when Java calls it, the second argument is the coroutine context, the third argument is the coroutine launch mode, and the next one is the anonymous class object in Function2. The receiver is a Continuation and the last two parameters are 2 and (Object)null.
When viewing the raw bytecode, you can see the following declaration:
final class com/docwei/kotlindemo/TestKt$main$1 $1extends kotlin/coroutines/jvm/internal/SuspendLambda// The coroutine body is compiledSuspendLambdaClass !!!!implements kotlin/jvm/functions/Function2 { // also Fucntion2!!
INVOKESTATIC kotlinx/coroutines/BuildersKt.launch$default(Lkotlinx/coroutines/CoroutineScope; Lkotlin/coroutines/CoroutineContext; Lkotlinx/coroutines/CoroutineStart; Lkotlin/jvm/functions/Function2; ILjava/lang/Object;) Lkotlinx/coroutines/Job;Copy the code
SuspendLambda class can look at class inheritance
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope. () - >Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block) / / # 1
return coroutine
}
Copy the code
#1 looks like we’re creating a new coroutine and passing in a block is our coroutine body.
//from AbstractCoroutine
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R. () - >T) {
start(block, receiver, this)}//from CoroutineStart
public operator fun <T> invoke(block: suspend() - >T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion) //#2 will go here
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
Copy the code
#2 Look at the declaration of block here, lambda with suspend, to start a coroutine that supports cancellation.
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) { // create + resumeWith = start
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
dispatcherFailure(completion, e)
}
}
Copy the code
And as you follow along, you’ll see that this R is, in fact, the CoroutineScope, completion is the coroutine that we have in coroutine. Start, Said it is also a Continuation (coroutines is actually a Continuation, is also a CoroutineScope | coroutines is a SuspendLambda body, it is also a Continuation), AbstractCoroutine AbstractCoroutine class declaration
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
Copy the code
Block: suspend CoroutineScope.() -> Unit compiles to the SuspendLambda class
//from Intrinsicskt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion) // Create suspendLambda object
else{... }}//from ContinuationImpl
public fun intercepted(a): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this)
.also { intercepted = it }
Copy the code
Remember the coroutine start= create + resumewith(Unit)
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) - >Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
inline fun resumeCancellableWith(result: Result<T>,
noinline onCancellation: ((cause: Throwable) - >Unit)? {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this) //
} else{... }}Copy the code
The scheduler we pass in is a custom scheduler, so when the coroutine is started it will be scheduled (in fact, it will switch the thread), and the coroutine body of the child coroutine will be executed in this thread.
//from ExecutorCoroutineDispatcherImpl
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
cancelJobOnRejection(context, e)
Dispatchers.IO.dispatch(context, block)
}
}
Copy the code
Thread pools perform tasks, which must depend on the run method.
//from DispatchedTask
public final override fun run(a) {
try{... continuation.resume(getSuccessfulResult(state)) } }catch (e: Throwable) {
......
} finally{... }}//from BaseContinuationImpl
public final override fun resumeWith(result: Result<Any? >){...val outcome = invokeSuspend(param) //invokeSuspend is finally called
if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome) ....... }}Copy the code
So the first call to invokeSuspend of Function2 is triggered by resumeWith(Unit) when the coroutine starts.
3.2. Custom SingleThread pool newSingleThreadContext(XXX)
val dispatcher = newSingleThreadContext("my owner Thread ")
public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
newFixedThreadPoolContext(1, name)
public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
val threadNo = AtomicInteger()
val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
t.isDaemon = true // Daemon thread
t
}
return executor.asCoroutineDispatcher()
}
Copy the code
The thread created by newSingleThreadContext is a daemon thread, in case the coroutine fails to cancel properly and the app quits and terminates execution. And is the key point is the Executors. NewScheduledThreadPool thread pool, generated by this way is easy for you timing tasks, such as delay the operation.
3.3 Scheduling threads of coroutines
The main thread of the launch is the default schedulerTo start a schedule, you simply wrap the coroutine body into a runnable and throw it to the thread pool to execute. If the current thread pool supports timed execution, the current thread is used. If the current thread pool does not support timed execution, the current thread uses a default DefaultExecutor, which is not ScheduledExecutorService. It is implemented through locksupport. parkNanos(Blocker, Nanos).
Let’s take a look at DefaultExecutor’s daemon run method 👇 :
// The Thread is also a daemon Thread.
override fun run(a) {
ThreadLocalEventLoop.setEventLoop(this)
registerTimeLoopThread()
try {
var shutdownNanos = Long.MAX_VALUE
if(! notifyStartup())return
while (true) { / / death cycle
Thread.interrupted() // just reset interruption flag
var parkNanos = processNextEvent() // Remove the next event
if (parkNanos == Long.MAX_VALUE) {
// nothing to do, initialize shutdown timeout
val now = nanoTime()
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // shut thread down
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
shutdownNanos = Long.MAX_VALUE
if (parkNanos > 0) {
if (isShutdownRequested) return
parkNanos(this, parkNanos) // If the processing time is not reached, block the daemon thread.}}}finally {
_thread = null // this thread is dead
acknowledgeShutdownIfNeeded()
unregisterTimeLoopThread()
if(! isEmpty) thread// recreate thread if it is needed}}@InlineOnly
internal inline fun parkNanos(blocker: Any, nanos: Long){ timeSource? .parkNanos(blocker, nanos) ? : LockSupport.parkNanos(blocker, nanos) }Copy the code
Handler looper. loop, blocking via epoll_wait.
public static void loop(a) {
for (;;) {
Message msg = queue.next(); // might block. msg.target.dispatchMessage(msg); . }}Message next(a) {
int nextPollTimeoutMillis = 0;
for (;;) {
nativePollOnce(ptr, nextPollTimeoutMillis); // The bottom layer finally calls epoll_wait
synchronized (this) {...if(msg ! =null) {
if (now < msg.when) {
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else{... }}else {
nextPollTimeoutMillis = -1; }... nextPollTimeoutMillis =0; }}Copy the code
Third, delay (time)
If you think of a coroutine as a thread, delay(1000) is the thread’s sleep(1000). Sleep blocks the thread, but does the delay inside the coroutine block the current thread? Don’t. Continuing with the above example, let them schedule on the same thread, and then both delay. You’ll notice that it’s not blocking the thread.
Delay does not block the thread of execution of the current coroutine, so it is called lightweight. If delay does not block the current thread, how does it know to resume at the specified time? Is it possible to use a similar android handler postdelay mechanism, of course not.
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T- > >)Unit
): T =suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
Copy the code
Continuation encapsulated into support to cancel CancellableContinuation cancellable. InitCancellability associated coroutines, father and son () here is encapsulated into support to cancel coroutines, go into the state machine process?
//from ExecutorCoroutineDispatcherImpl
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = (executor as? ScheduledExecutorService)? .scheduleBlock( ResumeUndispatchedRunnable(this, continuation),
continuation.context,
timeMillis
)
if(future ! =null) { // If the current thread pool is ScheduledExecutorService, it is handled directly in the current thread pool
continuation.cancelFutureOnCancellation(future)
return
}// Just use the default DefaultExecutor
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
}
private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null}}//ScheduledExecutorService
publicScheduledFuture<? > schedule(Runnable command, long delay, TimeUnit unit);Copy the code
The delay is implemented by the Scheule method of ScheduledExecutorService and will not block the current thread. The thread pool of the thread scheduled by the subcoroutine is a thread pool that supports the scheduled execution of tasks. This is distinguished from LockSupport which goes with DefaultExecutor by default. A runnable run method is the dispatcher. ResumeUndispatched (Unit).
private class ResumeUndispatchedRunnable(
private val dispatcher: CoroutineDispatcher,
private val continuation: CancellableContinuation<Unit>
) : Runnable {
override fun run(a) {
// This is a Unit
with(continuation) { dispatcher.resumeUndispatched(Unit)}}}override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation
resumeImpl(value, if(dc? .dispatcher ===this) MODE_UNDISPATCHED else resumeMode)
}
// The state machine of coroutines handles state reversals
private fun resumeImpl(
proposedUpdate: Any? , resumeMode:Int,
onCancellation: ((cause: Throwable) - >Unit)? = null
) {
_state.loop { state ->
when (state) {
is NotCompleted -> {
val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
if(! _state.compareAndSet(state, update))return@loop // retry on cas failure
detachChildIfNonResuable()
dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
return // done
}
is CancelledContinuation -> {
......
}
Copy the code
The state machine of the coroutine starts processing the state flow, skipping the state machine.
private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
dispatch(mode)
}
Copy the code
private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
dispatch(mode)
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int){... }else {
resume(delegate, undispatched)
}
}
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatched: Boolean) {
// This resume is never cancellable. The result is always delivered to delegate continuation.
val state = takeState()
val exception = getExceptionalResult(state)
val result = if(exception ! =null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
when {
undispatched -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
else -> delegate.resumeWith(result)
}
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWith(result: Result<T>) {
withContinuationContext(continuation, countOrElement) {
continuation.resumeWith(result)
}
}
Copy the code
When you see continuation.resumeWith(result), you must call invokeSuspend again and the code continues.
//from BaseContinuationImpl
public final override fun resumeWith(result: Result<Any? >){...val outcome = invokeSuspend(param) //invokeSuspend is finally called
if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome) ....... }}Copy the code
Now you can see that if there is a delay in a coroutine, resumeWith(Unit) will be called twice, once to start the coroutine and once to resume the delay. Two calls to resumeWith(Unit) correspond to two calls to the invokeSuspend method in Java bytecode. Bennyhuo says: If there are n hanging points in a coroutine, then a total of N +2 calls to resume (2 is called once at start and once at end) will also be called to resumeWith at the end. It doesn’t look like it’s going to. ResumeWith starts calling once, and when it’s done calling again, invokeSuspend is going to be called 3 times, and the logic for the case that label equals 1 is going to go through multiple times, which should print multiple times, but it’s not.
Four, subsequent
The current state machine is not touched, and so is the event handling mechanism of coroutines. They can be analyzed separately.