We’ll walk you through the following example by analyzing the source code of the basic launch usage
This article mainly goes through the process of launch method on behalf of everyone
This is the root example, and this is the root example that follows
Globalscope. launch {print("launch ") delay(1000)}Copy the code
Enter launch to view source code:
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job {//1- puts the current coroutine scoped context, Returns a new val newContext = newCoroutineContext(context) //2- Creates a new coroutine val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, Active = true) //3- Run coroutine. Start (start, coroutine, block) return coroutine}Copy the code
1.1. First, look at the newCoroutineContext method
For those not familiar with basic usage, check out this article: juejin.cn/post/695061…
// Add the current coroutine scoped context, Combined with the existing returns a new @ ExperimentalCoroutinesApi public actual fun CoroutineScope. NewCoroutineContext (context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined ! == Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }Copy the code
The second step is to create a StandaloneCoroutine because the DEFAULT is CoroutineStart.DEFAULT
—————— its passing generic is of type Unit, note that it will be used later
AbstractCoroutine private Open Class StandaloneCoroutine(parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } }Copy the code
AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine
public abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { ...... Omit n multiple code}Copy the code
Coroutine. Start (coroutine, block)
Parameter Meanings: -start is the coroutine object created in step 2 above. 1. -block is of type an extension method of the CoroutineScope object, It can return any value (I think the key is that in our analysis it corresponds to the method block in the example).
Coroutine. Start (start, coroutine, block)
By pointing to because coroutine is a StandaloneCoroutine object its start method but it doesn’t have one so it’s in its parent class AbstractCoroutine:
public fun <R> start(start: CoroutineStart, receiver: R, block: Suspend R.() -> T) {initParentJob() // start(block, receiver, this)}Copy the code
Note that the parameters in the start method here, note the following and do not be misled by the name
2.1, initParentJob() method, /// because there is no parent coroutine in the · root example ·, returns
internal fun initParentJobInternal(parent: Job?) {assert {parentHandle == NULL} if (parent == null) {/// Because there is no parent coroutine in the current example, parentHandle = NonDisposableHandle return is returned } parent.start() // make sure the parent is started @Suppress("DEPRECATION") val handle = parent.attachChild(this) parentHandle = handle // now check our state _after_ registering (see tryFinalizeSimpleState order of actions) if (isCompleted) { handle.dispose() parentHandle = NonDisposableHandle // release it just in case, to aid GC } }Copy the code
2.2. Since there is no parent coroutine in the root example, look directly at the start(block, receiver, this) method next
2.2.1,Start (block, receiver, this) : start: CoroutineStart,… It’s Kotlin’sInvoke the usage of theIf you are not sure about the use of ‘invoke’, search for it
Public fun <R> start(: start: CoroutineStart,... receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) }Copy the code
Parameter Meanings:
R generic in the block argument variable, in this root example, It represents the CoroutineScope 1. -start has a value of coroutinestart. DEFAULT which is the way the coroutine is always in effect 1. -receiver is the StandaloneCoroutine 1 This refers to the current AbstractCoroutine object, but its implementation subclass is StandaloneCoroutine
2.2.2,So we foundCoroutineStartClass, look inside the invoke method and find a method with three arguments as follows:
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
Copy the code
Parameter Meanings: -receiver is the StandaloneCoroutine above 1. -Completion refers to the current AbstractCoroutine object, which implements the Continuation, But the implementation subclass is StandaloneCoroutine
Because:
The when expression above passes the current CoroutineStart itself, which is an enum, and its value is: according to the coroutinescope.launch (…) in the 1.1 example above. Extension methods to learn that it is the DEFAULT CoroutineStart. DEFAULT, so the above code is heading down the block. The when startCoroutineCancellable (receiver, completion) method.
So: 2.2.3, which is an extension of the coroutine method (this method is an extension of the block, which is the body of the method we want to execute)
Then,Add a coroutine cancellation listener. R is a parameter variable that, in this root example, represents the CoroutineScope
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
Copy the code
Parameter Meanings: 1. -completion but there is a subclass of StandaloneCoroutine, which is a subclass of AbstractCoroutine. Its type completion: Continuation, “generic T”, is of type Unit
Three,CreateCoroutineUnintercepted method, this according to the realization of the virtual machine to generate different object, the screenshot below
Specific code is as follows: T generic on behalf of the Unit, can be seen as a method body, the coroutines hang method body extends a method called createCoroutineUnintercepted
@ SinceKotlin (" 1.3 ") public actual fun < T > (suspend () - > T). The createCoroutineUnintercepted (completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) //1. Return if (this is BaseContinuationImpl) create(probeCompletion) else // createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any? >).invoke(it) } }Copy the code
If StandaloneCoroutine does not implement BaseContinuationImpl, it is implemented by JVM. If StandaloneCoroutine does not implement BaseContinuationImpl Class files generated at compile time,
I verified the results myself by debugg breakpoint debugging, and can view some details from the kotlin bytecode compilation file of the current root example: So it’s going to go through the Create code, and the implementation of that, if you look at the decompiled Java file down here, is going to execute his create method
Globalscope. launch {print("launch ") delay(1000)}Copy the code
The above code is converted to Java as follows, and named “Java root example”. The following code is referred to as Java root example:
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); String var2 = "launch "; boolean var3 = false; System.out.print(var2); this.label = 1; if (DelayKt.delay(1000L, this) == var4) { return var4; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Unit.INSTANCE; } //// executes here first, returning a Continuation type Function2, @notnull Public Final Continuation CREATE (@nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }), 3, (Object)null);Copy the code
3.2 which part of the coroutine method body is implemented by JVM, we can see it by decomcompiling jADX tool:
Graph one:
Figure 2:
Figure 2 shows that it is what we need to implement, decompilated: it executes
Fun initKotlin(){globalscope.launch {print("launch ") delay(1000)}} public final void initKotlin(){ BuildersKt__Builders_commonKt.launch$default(GlobalScope.INSTANCE, null, null, new MainActivity$initKotlin$1(null), 3, null); }Copy the code
Above position is: new MainActivity initKotlininitKotlininitKotlin1 (null), 3, null) corresponding to the Java bytecode into above related Function2 position, can see figure 3
Final Class MainActivity$initKotlin$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> { int label; MainActivity$initKotlin$1(Continuation<? super MainActivity$initKotlin$1> continuation) { super(2, continuation); } public final Continuation<Unit> create(Object obj, Continuation<? > continuation) { return new MainActivity$initKotlin$1<>(continuation); } public final Object invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) { return ((MainActivity$initKotlin$1) create(coroutineScope, continuation)).invokeSuspend(Unit.INSTANCE); } public final Object invokeSuspend(Object $result) { Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch (this.label) { case 0: ResultKt.throwOnFailure($result); System. The out. Print (" launch usage "); Continuation continuation = this; this.label = 1; if (DelayKt.delay(1000, continuation) ! = coroutine_suspended) { break; } else { return coroutine_suspended; } case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Unit.INSTANCE; }}Copy the code
It implements the interface to Function2, so the scope of the root example can be considered: It implements SuspendLambda, which implements the BaseContinuationImpl interface. From this we know that 3.1, the process analysis step above, is correct
3.3. Create (probeCompletion) method
We can analyze the create method from 3.1 above. It goes invoke and finally calls the invokeSuspend method
3.4,Go back to step 3, method of createCoroutineUnintercepted intercepted in the ()
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
Copy the code
Four,Follow the process through the intercepted() method in our SuspendLambda root example
ContinuationImpl class, whose implementation is SuspendLambda, their code is as follows:
4.1,And SuspendLambda. Class:
internal abstract class SuspendLambda( public override val arity: Int, completion: Continuation<Any? >? ///StandaloneCoroutine ) : ContinuationImpl(completion), FunctionBase<Any? >, SuspendFunction { constructor(arity: Int) : this(arity, null) public override fun toString(): String = if (completion == null) Reflection.renderLambdaToString(this) // this is lambda else super.toString() // this is continuation }Copy the code
解 答 : SuspendLambda: Completion: Continuation
) its value is corresponding to the probeCompletion parameter in the create(Receiver, probeCompletion) method, and it is found to be the StandaloneCoroutine object after tracing the source.
4.2,ContinuationImpl class is as follows:
// State machines for named suspend functions extend from this class internal abstract class ContinuationImpl( completion: Continuation<Any? >? , private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any? >? : this(completion, completion? .context) public override val context: CoroutineContext get() = _context!! ** @transient private var intercepted: Continuation<Any? >? = null /// By default, ContinuationInterceptor is fetched from the coroutine context. In this case, it is the DefaultScheduler object created by newCoroutineContext. It inherits the CoroutineDispatcher, where the interceptContinuation is a function of the latter. The interceptContinuation takes this, This is the anonymous inner class (SuspendLambda) in the sample code, which is the Continuation that will eventually need to be distributed for execution. Public fun intercepted(): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this) .also { intercepted = it } protected override fun releaseIntercepted() { val intercepted = intercepted if (intercepted ! = null && intercepted ! == this) { context[ContinuationInterceptor]!! .releaseInterceptedContinuation(intercepted) } this.intercepted = CompletedContinuation // just in case } }Copy the code
Explanation: By Default, values with key as ContinuationInterceptor are fetched from the coroutine context. In this case, the Dispatchers.Default object is created by newCoroutineContext. It is – “DefaultScheduler -” ExperimentalCoroutineDispatcher – “ExecutorCoroutineDispatcher -” CoroutineDispatcher such inheritance, Instead, he calls the interceptContinuation method, which implements the CoroutineDispatcher object only in the base class CoroutineDispatcher as follows: See the example in step 5 below for details
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
Copy the code
The interceptContinuation argument is this. This is the anonymous inner class SuspendLambda in the sample code. That is, the Continuation that will eventually need to be distributed and executed.
1. The (context [ContinuationInterceptor]? .InterceptContinuation (this)?: The interceptContinuation method is in the called CoroutineDispatcher object
4.3The BaseContinuationImpl class is as follows:
@sincekotlin ("1.3") Internal Abstract class BaseContinuationImpl(// This is' public val 'so that it is private on JVM and cannot be modified by untrusted code, yet // it has a public getter (since even untrusted code is allowed to inspect its call stack). public val completion: Continuation<Any? >? : Continuation<Any? >, CoroutineStackFrame, Serializable { // This implementation is final. This fact is used to unroll resumeWith recursion. public final override fun resumeWith(result: Result<Any? >) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any? > = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any? >): Any? protected open fun releaseIntercepted() { // does nothing here, overridden in ContinuationImpl } public open fun create(completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Continuation) has not been overridden") } public open fun create(value: Any? , completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Any? ; Continuation) has not been overridden") } public override fun toString(): String = "Continuation at ${getStackTraceElement() ? : this::class.java.name}" // --- CoroutineStackFrame implementation public override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame public override fun getStackTraceElement(): StackTraceElement? = getStackTraceElementImpl() }Copy the code
Fifth, DispatchedContinuationFinding it completes the intercepted section of the following code
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
Copy the code
The method for a resumeCancellableWith is as follows:
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
Copy the code
ResumeCancellableWith (Result, onCancellation) The previous step 4.2 also gives an analysis.
ResumeCancellableWith (DispatchedContinuation
As follows:
It inherits DispatchedTask -> inherits SchedulerTask() -> Inherits Task -> Inherits Runable, which can be executed in a thread. It inherits a Continuation and assigns values to the passed Continuation argument, so a DispatchedContinuation can also be used as a parameter Continuation
Note: The @jVMField Val continuation argument here: Internal class DispatchedContinuation<in T>(@jvmfield val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation { ..... @suppress ("NOTHING_TO_INLINE") Inline fun checkecancellableWith (result: result <T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) {val state = result.tostate (onCancellation) ///dispatcher corresponds to the object of dispatchers.default +debug that we analyzed in previous **4.2 steps ** / / / dispatcher. IsDispatchNeeded (context) object is Dispatchers. The Default isDispatchNeeded method, he didn't realize too much to write, So we use the isDispatchNeeded method in the base class CoroutineDispatcher, (the default is true if the dispatcher. IsDispatchNeeded (context)) {_state = state resumeMode = MODE_CANCELLABLE / / / the dispatcher here, // This is our SuspendLambda object in step 4.2 **. Dispatcher.dispatch (context, this)} else {executeUnconfined(state, MODE_CANCELLABLE) {if (! resumeCancelled(state)) { resumeUndispatchedWith(result) } } } } }Copy the code
Explanation:
So you’re using the dispenser, you’re distributing the task, and you might be wondering, why don’t you just do it, and then do it after you distribute it? Different tasks are handled in different ways, such as the need to update the UI in the main thread. The function of the dispatcher is to assign tasks to a thread pool (default, IO) or to a Handler (sending tasks to the main thread via Handler POST).
In step 2, a Default dispatchers.default is added to the CoroutineContext CoroutineContext
5.1The dispatcher.dispatch(Context, this) procedure of the DefaultScheduler object is also the execution module of the root example
Its inheritance process is analyzed in step 4.2 as follows, It is – “DefaultScheduler -” ExperimentalCoroutineDispatcher – “ExecutorCoroutineDispatcher -” CoroutineDispatcher such inheritance, Its implementation can be seen in the class ExperimentalCoroutineDispatcher dispatcher method is through the thread pool distributed processing
@InternalCoroutinesApi public open class ExperimentalCoroutineDispatcher( private val corePoolSize: Int, private val maxPoolSize: Int, private val idleWorkerKeepAliveNs: Long, private val schedulerName: String = "CoroutineScheduler" ) : ExecutorCoroutineDispatcher() { public constructor( corePoolSize: Int = CORE_POOL_SIZE, maxPoolSize: Int = MAX_POOL_SIZE, schedulerName: String = DEFAULT_SCHEDULER_NAME ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, SchedulerName) @deprecated (message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN) public constructor( corePoolSize: Int = CORE_POOL_SIZE, maxPoolSize: Int = MAX_POOL_SIZE ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS) override val executor: Executor get() = coroutineScheduler // This is variable for test purposes, so that we can reinitialize from clean state private var coroutineScheduler = createScheduler() override fun dispatch(context: CoroutineContext, block: Runnable): Unit = try { coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.dispatch(context, block) } .... private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) ### .... Omit someCopy the code
You can see from the above code that we are using the thread pool for suspension function processing. It is the Dispatch method of the CoroutineScheduler object
Vi.* CoroutineScheduler object, thread pool distribution:
6.1 coroutine cheduler is a CoroutineScheduler, using shared threads, each thread is a worker. Coroutine tasks that need to be executed will be added to queues, which are divided into worker local queues and global queues. When the worker’s local queue is empty, it will take coroutine tasks from other workers to execute them.
Source code analysis here, the following process, you should also be able to guess about, business functions, distributors, coroutine scheduling classes have. Let’s take a look at the process as a whole
The DispatchedContinuation object, which indirectly inherits the Runable and directly inherits the Continuation type. So here are two things
1. At this point the business code (of the anonymous inner class Continuation type in the example) has been wrapped as a DispatchedContinuation type, which can be retrieved via the delegate variable.
Because this object is of type Runable, you can run the coroutine scheduling class in a thread. You can create a task with a DispatchedContinuation. You can create a thread with a DispatchedContinuation.
1. In the run function of the DispatchedContinuation object, resumeWith() ->invokeSuspend() of the anonymous inner class in the example is executed.
//6.1, parameters block our ** root example ** content object fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {///6.2, trackTask() // this is needed for virtual time support Val Task = createTask(block, Task context) // Try to submit the task to the local queue and act depending on the result // Here try to add tasks to the worker of the local queue val currentWorker = currentWorker () val notAdded = currentWorker. SubmitToLocalQueue (task, tailDispatch) if (notAdded ! If (! = null) {// if (! = null) { addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker ! = null // Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) 6.6. In this case, this function is executed to create a thread, SignalCpuWork ()} else {// Increment Blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark)}Copy the code
6.2. SignalCpuWork () method
Work fun signalCpuWork() {// Try to get an available worker. If you get an available worker, return it because the task has been queued. // Create a new worker and start if (tryCreateWorker()) return tryUnpark()}Copy the code
6.3. TryCreateWorker () to create a new worker task
6.3.1, tryCreateWorker ()
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
val created = createdWorkers(state)
val blocking = blockingTasks(state)
val cpuWorkers = (created - blocking).coerceAtLeast(0)
/*
* We check how many threads are there to handle non-blocking work,
* and create one more if we have not enough of them.
*/
if (cpuWorkers < corePoolSize) {
val newCpuWorkers = createNewWorker()
// If we've created the first cpu worker and corePoolSize > 1 then create
// one more (second) cpu worker, so that stealing between them is operational
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
if (newCpuWorkers > 0) return true
}
return false
}
Copy the code
6.3.2 createNewWorker() : Creates a new one that is smaller than the number of core threads
private fun createNewWorker(): Int { synchronized(workers) { // Make sure we're not trying to resurrect terminated scheduler if (isTerminated) return -1 val state = controlState.value val created = createdWorkers(state) val blocking = blockingTasks(state) val cpuWorkers = (created - blocking).coerceAtLeast(0) // Double check for overprovision if (cpuWorkers >= corePoolSize) return 0 if (created >= maxPoolSize) return 0 // start & register new worker, commit index only after successful creation val newIndex = createdWorkers + 1 require(newIndex > 0 && workers[newIndex] == null) /* * 1) Claim the slot (under a lock) by the newly created worker * 2) Make it observable by increment created workers count * 3) Only then start the worker, otherwise it may miss its own creation */ val worker = Worker(newIndex) workers[newIndex] = worker require(newIndex == incrementCreatedWorkers()) worker.start() return cpuWorkers + 1 } }Copy the code
The key point above is the worker.start() method, which obviously starts the work
Internal inner class Worker private constructor() : Thread() {}
internal inner class Worker private constructor() : Thread() { ... override fun run() = runWorker() private fun runWorker() { var rescanned = false while (! isTerminated && state ! TERMINATED) {/// remove the first task from the queue. Val Task = findTask(mayHaveLocalTasks) // Task Found. Execute and repeat if (task! = null) {rescanned = false minDelayUntilStealableTaskNs = 0 l / / key is here, ExecuteTask (task) continue} else {mayHaveLocalTasks = false} /* * No tasks were found: * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline. * Then its deadline is stored in [minDelayUntilStealableTask] * * Then just park for that duration (ditto re-scanning). * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations, * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve * it with "spinning via scans" mechanism. * NB: this short potential parking does not interfere with `tryUnpark` */ if (minDelayUntilStealableTaskNs ! = 0L) { if (! rescanned) { rescanned = true } else { rescanned = false tryReleaseCpu(WorkerState.PARKING) interrupted() LockSupport.parkNanos(minDelayUntilStealableTaskNs) minDelayUntilStealableTaskNs = 0L } continue } /* * 2) Or no tasks available, time to park and, potentially, shut down the thread. * Add itself to the stack of parked workers, re-scans all the queues * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks. */ tryPark() } tryReleaseCpu(WorkerState.TERMINATED) } private fun executeTask(task: Task) { val taskMode = task.mode idleReset(taskMode) beforeTask(taskMode) runSafely(task) afterTask(taskMode) } Fun runSafely(task: task) {try {task.run()} catch (e: Throwable) { val thread = Thread.currentThread() thread.uncaughtExceptionHandler.uncaughtException(thread, e) } finally { unTrackTask() } } ... }Copy the code
Override fun run() = runWorker()- executeTask(task: task)- “runSafely(task: Task) Task) finally executes the task.run() method
Remember from 4.1 that its implementation is the DispatchedContinuation class and its run method comes from its DispatchedTask inheritance, so it finally executes the Run method of DispatchedTask:
6.5. Run method of DispatchedTask
public final override fun run() { assert { resumeMode ! = MODE_UNINITIALIZED } // should have been set before dispatching val taskContext = this.taskContext var fatalException: Throwable? = null try {---------- //1. This parameter is overwritten in the DispatchedContinuation class and assigned to this, Val delegate = delegate as DispatchedContinuation<T> ------------- //2. When you create the DispatchedContinuation, you pass in two parameters (step 8), Where continuation is the anonymous inner class val Continuation = delegate.continuation withContinuationContext(Continuation, delegate.countOrElement) { val context = continuation.context val state = takeState() // NOTE: Must take state in any case, even if cancelled val exception = getExceptionalResult(state) /* * Check whether continuation was originally resumed with an exception. * If so, it dominates cancellation, otherwise the original exception * will be silently lost. */ val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null if (job ! = null && ! job.isActive) { val cause = job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception ! = null) { continuation.resumeWithException(exception) } else { ----------------//3. Resume (getSuccessfulResult(state))}}}} catch (e: Throwable) { // This instead of runCatching to have nicer stacktrace and debug experience fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } }Copy the code
Resume (getSuccessfulResult(state)) calls the resumeWith(result.success (value)) method for the continuation: And because we obtained the method body of the root example from the above analysis, its structure is:
SuspendLambda-》ContinuationImpl-》BaseContinuationImpl
Copy the code
So it finally calls its resumeWith: as follows
public final override fun resumeWith(result: Result<Any? >) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any? > = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }Copy the code
**invokeSuspend(result: result <Any? >): Any? ** flow, at this point the entire call flow analysis is complete
Reference: blog.csdn.net/xx326664162… Juejin. Cn/post / 695061… Blog.csdn.net/xx326664162…