Technology more than, the article has material, concern public number nine heart said, a high quality good article every week, and nine hearts in Dachang road side by side.
preface
This article follows the previous article, this time we have a good talk about the principle of coroutines, through the study of the last article, I believe that you are very familiar with how to use coroutines.
The story starts with the last coroutine share, and since we don’t have much practice with coroutines, there’s a lot of debate about how to execute the following code:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a + b
Log.e(TAG,"result:$c")}Copy the code
Some people say that a and B will execute sequentially, some people say that A and B will execute in parallel, so what is the result of the execution? We will give them in the following article.
This article is the second in a three-part series:
Kotlin-coroutines as You Learn, Kotlin-Coroutines As You Learn, Kotlin-Coroutines as You Learn, Kotlin-Coroutines as You Learn
A brief introduction to the structure
First of all, we have to figure out what’s in coroutines, and if you use coroutines, you know that coroutines have CoroutineScope, CoroutineContext, and CoroutineDispatcher, and those are all apis that you can access while you’re using them.
I briefly organized the main base classes of coroutines:
The class structure of coroutines can be broken down into three parts: CoroutineScope, CoroutineContext, and Continuation.
1. Continuation
If you use coroutines, you know that you can suspend your coroutine from a time-consuming suspend operation, and when the task ends, it automatically cuts back.
The secret is continuations, which understand a program, which you can understand each time it wraps up the rest of the code at the start of a coroutine hang and executes the rest of the code after it finishes. A block of code for a coroutine may be cut into continuations, with one assigned to each place where it needs to hang.
A few conclusions. If you suspend a time-consuming operation on a coroutine, it will be suspended automatically, but the time-consuming operation must be done anyway. You just switch to another thread, and then you have to cut back. This is where continuations come in.
The flow of a Continuation looks like this:
Coroutines, whether launched with launch or Async, have a continuation that ends with a callback.
2. CoroutineScope
There’s not much to say about CoroutineScope, which holds CoroutineContext, which manages the life cycle of coroutines.
3. CoroutineContext
At first, I felt dizzy when I saw CoroutineContext, I didn’t understand why it was designed like this, but it got better after I read the articles of Bennyhuo.
As you can see from the structure of the coroutine class above, the CoroutineContext interface looks a bit like a List, and the Element interface, which inherits the CoroutineContext interface, defines its elements.
The Element interface is then divided into two categories, Job and ContinuationInterceptor:
Job
: Literally, it stands for a task,Thread
It also performs tasks, so we can understand that it defines some things about coroutines, like the state of coroutines, how coroutines and subcoroutines are managed, and so on.ContinuationInterceptor
: Also literally, it isContinuation
The interceptor passes interceptionContinuation
Do what we want to do, such as thread switching.
Two, structure source code analysis
Above we introduced the concept of coroutines three major pieces, in this part, we analyze from source code.
1. Continuation
Methods decorated by suspend are treated by the compiler during compilation as CPS(Continuation style) conversions, and the suspend method is wrapped as a Continuation.
After all this talk of continuations, we haven’t seen the interface code yet, and since there isn’t much of it, I’ve posted all of it:
/** * Interface representing a continuation after a suspension point that returns a value of type `T`. */
@SinceKotlin("1.3")
public interface Continuation<in T> {
/** * The context of the coroutine that corresponds to this continuation. */
public val context: CoroutineContext
/** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. */
public fun resumeWith(result: Result<T>)
}
Copy the code
We focus on the Continuation#resumeWith() method, which, from the comments, resumes the execution of the coroutine by returning the value at the start of the suspend suspension. The coroutine can obtain either a successful value or a failed Result from the parameter Result
. If there is no Result, So the generic of Result
is Unit. Resulut this class is also very simple, interested students can view the source code.
BaseContinuationImpl implements the Continuation interface. Let’s look at the Continuation#resumeWith method:
internal abstract class BaseContinuationImpl(
// Call Continuation after completion
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any? >) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
valoutcome: Result<Any? > =try {
Execute the code block in suspend
val outcome = invokeSuspend(param)
// 2. Return early if the code hangs
if (outcome === COROUTINE_SUSPENDED) return
// 3. Return the result
Result.success(outcome)
} catch (exception: Throwable) {
// 3. An error message is displayed
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 4. Recursion if there are sub-completions in completion
current = completion
param = outcome
} else {
// 5. Notification of results
completion.resumeWith(outcome)
return
}
}
}
}
}
Copy the code
The main process has been laid out in my comments, so let me explain the mechanics of continuations.
The constructor of each BaseContinuationImpl generated by the suspend method has a parameter called completion, which is also a Continuation and is called when the suspen method is completed. We’ll call it later
For simplicity’s sake, let’s just look at the launch process 3, 4, and 5. Usually a launch generates an outer Continuation and a corresponding resulting Continuation. We will call the resulting continuation complete. Continuation is called in the following order:
- Call the outer
Continuation
In theContinuation#resumeWith()
Methods. - The method will execute
launch
Wrap the code block and return a result. - Hand in the results of the above code block execution
completion
Which completes notification of coroutine termination.
The above process only exists in a launch and no other time-consuming suspend operations are performed, which we will discuss in the following article.
As you can see from comment 2, after a time-consuming suspend operation, the result returned is a COROUTINE_SUSPENDED, which will be directly returned later. How will our completion be restored when the time-consuming operation is over?
2. CoroutineContext and Element
In profiling, we say that CoroutineContext is structured like a collection and draws conclusions from its interfaces:
public interface CoroutineContext {
// Get method, obtained by key
public operator fun <E : Element> get(key: Key<E>): E?
// add operation
public fun <R> fold(initial: R, operation: (R.Element) - >R): R
// Operator +, the actual implementation calls the fold method
public operator fun plus(context: CoroutineContext): CoroutineContext
// Remove operation
public fun minusKey(key: Key< * >): CoroutineContext
// CoroutineContext Specifies the Key
public interface Key<E : Element>
// CoroutineContext defines the element
public interface Element : CoroutineContext {
// key
public val key: Key<*>
/ /...}}Copy the code
The Element Element can be obtained by Key in CoroutineContext, and the Element interface is inherited from the CoroutineContext interface.
In addition, CoroutineContext supports add and remove operations, and supports the + operator to accomplish additions. The + operator, also known as the Plus method, has a specific implementation, which can be interesting to see for yourself, mainly involving the addition of the ContinuationInterceptor.
1.1 the Job
The Job comment states the definition as follows:
A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
It can be concluded that:
- Background tasks
- Can be cancelled
- The life cycle ends when it completes
A Job has a state that is similar to a Thread. A Job has a state that is similar to a Thread. A Job has a state that is similar to a Thread.
Another important aspect of Job management is the management of sub-jobs. The main rules are as follows:
- The child
Job
When it’s all over, fatherJob
Will not end - The father
Job
At the time of cancellation, sonJob
Will be cancelled
Some of the above information can be obtained from the Job interface documentation. So, where does Job come from? If you look at the coroutinescop #launch method, you’ll conclude that the return type of this method is Job, and we create a Job each time we call this method.
1.2 ContinuationInterceptor
Continuation interceptor, as the name suggests, looks at the interface first:
interface ContinuationInterceptor : CoroutineContext.Element {
ContinuationInterceptor's Key in CoroutineContext
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* 拦截 continuation
*/
fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
/ /...
}
Copy the code
This interface can extract just two pieces of information:
- The interceptor
Key
That is, whatever is behind youCoroutineContext
How many interceptors are there,Key
为ContinuationInterceptor
Can only have one interceptor. - As we all know,
Continuation
In the call itContinuation#resumeWith()
Method is executedsuspend
The code block of the modified function, if we intercept it in advance, can we do something else, like switch threads, which is also hereContinuationInterceptor
One of the functions of.
It should be noted that we specify the threads on which coroutines occur through Dispatchers, which implement the ContinuationInterceptor interface.
3. CoroutineScope
The CoroutineScope interface is simple:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
Copy the code
It requires that subsequent implementations provide CoroutineContext, but as we all know, CoroutineContext is an important part of coroutines, including both jobs and schedulers.
In the above code I have used several times the extension library for Coroutines in Android Jetpack which makes it easier to get CoroutineScope without having to manually cancel the component onDestroy and the source code is super simple. If you will use Lifecycle:
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
// ...
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}
Copy the code
It also allows you to call coroutines during specified life cycles, as you can see from the interface.
Process source code analysis
Let’s start with the following code:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1.2)}val b = async { getResult(3.5)}val c = a.await() + b.await()
Log.e(TAG, "result:$c")}suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}
Copy the code
Although the code is very simple, but the source code is more complex, let’s talk about step by step.
The first step is to get the CoroutineScope
As I have stated above, we use the Coroutine extension library for Lifecycle, if we don’t use the extension library we use MainScope, and their CoroutineContext is the same:
public fun MainScope(a): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
// ...
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
// ...
return newScope
}
}
Copy the code
It’s notable that MainScope and LifecycleCoroutineScope both use the container container for the CoroutineContext, which is the + dispatchers.main container.
It’s important to note that the SupervisorJob and dispatchers. Main are designed to represent the Job and ContinuationInterceptor mentioned in CoroutineContext, respectively.
The second step starts the coroutine
Go directly to the CoroutineScope#launch() method:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope. () - >Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
Copy the code
The above method has three parameters, the first two will not be described too much, the third parameter:
block: suspend CoroutineScope.() -> Unit)
Copy the code
This is a method that is a lambda argument and also indicates that it needs to be decorated by suspend. Moving on to the launch method, it does two main things:
- Combination of new
CoroutineContext
- Create another one
Continuation
Combine the new CoroutineContext
In the first line of code val newContext = newCoroutineContext(context) does the first thing, where newCoroutineContext(context) is an extension method:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if(combined ! == Dispatchers.Default && combined[ContinuationInterceptor] ==null)
debug + Dispatchers.Default else debug
}
Copy the code
The CoroutineScope uses its own collection of coroutineContext, using the + operator to add the coroutineContext we provided in the launch method.
Create a Continuation
Going back to the previous code, normally we don’t specify the start argument, so it will use the DEFAULT coroutinestart.default, and the coroutine will end up with a StandaloneCoroutine.
The StandaloneCoroutine implementation is AbstractCoroutine, and if you look at the class diagram above, you’ll see that it implements a bunch of interfaces like Continuations, Jobs, and CoroutineScope. It should be noted that this StandaloneCoroutine is actually the complete part of our current Suspend Contination.
And then it calls
coroutine.start(start, coroutine, block)
Copy the code
This indicates that the coroutine has started.
The third step start
Enter the AbstractCoroutine#start method:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R. () - >T) {
initParentJob()
start(block, receiver, this)}Copy the code
Skip through layers of nesting and finally arrive at:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// Wrap it with Coroutine
createCoroutineUnintercepted(receiver, completion)
// Do interception if necessary
.intercepted()
// Call resumeWith
.resumeCancellableWith(Result.success(Unit))}Copy the code
Although this is just a function, the main logic behind it is revealed:
- Create one that has not been intercepted
Continuation
. - intercept
Continuation
. - perform
Continuation#resumeWith
Methods.
Step 4 create againContinuation
I’m using both here because we already created AbstractContinuaion in launch, but it’s a complete, as can be seen from the line arguments of each function.
SuspendLambda, which extends from ContinuationImpl. If you ask me why I can’t find an implementation in the source code, I think it might be related to the suspend modifier, Handled by the compiler, but the call stack does look like this:
Look at the implementation of the SuspendLambda class:
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
/ /...
}
Copy the code
As you can see, its constructor takes a complete parameter.
Step 5 Intercept processing
Back to:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// Wrap it with Coroutine
createCoroutineUnintercepted(receiver, completion)
// Do interception if necessary
.intercepted()
// Call resumeWith
.resumeCancellableWith(Result.success(Unit))}Copy the code
The interception method in Continuation#intercepted() is an extension:
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(a): Continuation<T> =
(this as? ContinuationImpl)? .intercepted() ?:this
Copy the code
CreateCoroutineUnintercepted (receiver, completion) returns a SuspendLambda, so it must be a ContinuationImpl and have a look at it intercept method implementation:
internal abstract class ContinuationImpl( completion: Continuation<Any? >? .private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion? .context)public override val context: CoroutineContext
get() = _context!!
public fun intercepted(a): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this)
.also { intercepted = it }
// ...
}
Copy the code
In the ContinuationImpl#intercepted() method, the data structure context is used directly to get the interceptor through context[ContinuationInterceptor].
CoroutineDispatcher interception implementation
ContinuationInterceptor intercepts CoroutineDispatcher class ContinuationInterceptor intercepts CoroutineDispatcher class ContinuationInterceptor interceptor intercepts CoroutineDispatcher class ContinuationInterceptor interceptor interceptor CoroutineDispatcher class ContinuationInterceptor interceptor CoroutineDispatcher class
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/ /...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 1. The intercepted Continuation is wrapped with a DispatchedContinuation layer
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
/ /...
}
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
// ...
override fun resumeWith(result: Result<T>) {
// ...
if (dispatcher.isDispatchNeeded(context)) {
// 2. The last parameter needs to provide Runnable. The parent class is already implemented
dispatcher.dispatch(context, this)}/ /...
}
// ...
}
// SchedulerTask is a Runnable
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
// ...
public final override fun run(a) {
// ...
try {
/ /...
withCoroutineContext(context, delegate.countOrElement) {
// 3. Continuation is a continuation of DispatchedContinuationcontinuation.resume(...) }}/ /...}}Copy the code
To put it simply, you add a layer of interception to the resumeWith operation of the existing Continuation, like this:
After adding CoroutineDispatcher, the CoroutineDispatcher#dispatch() method is executed before the real continuity #resumeWith(), So let’s focus on the coroutine Dispatch #dispatch implementation.
Talk about a concrete implementation of CoroutineDispatcher
First of all we have to figure out where does the CoroutineDispatcher come from? It gets from context, where does context come from?
Note the constructors of SuspendLambda and ContinuationImpl. The arguments in SuspendLambda do not have CoroutineContext, so they can only come from CoroutineContext in Completion, The CoroutineContext of completion comes from the launch method, which is the CoroutineScope, and the default is SupervisorJob() + dispatchers.main. But only dispatchers. Main inherited from CoroutineDispatcher.
Dispatchers.Main is a MainCoroutineDispatcher, and the corresponding MainCoroutineDispatcher is a HandlerContext:
internal class HandlerContext private constructor(
private val handler: Handler,
private valname: String? .private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
/ /...
override fun dispatch(context: CoroutineContext, block: Runnable) {
// Use the main thread Handler to execute tasks
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
// Use the Handler of the main thread to delay execution of the task, placing the completed continuation in the task
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
/ /..
}
Copy the code
The main thread is going to be assigned to the main thread Handler.
For those of you who are curious, what if it’s not the main thread? Thread pools that are not used by the main thread:
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
/ / time frames
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
}
Copy the code
CoroutineScheduler is a thread pool. If you want to know the specific process, students can look at the code.
Reading this, you may have a little idea why CoroutineContext is designed as a data structure:
coroutineContext[ContinuationInterceptor]
The interceptor of the current coroutine can be fetched directly, and only one scheduler can be used for each coroutine.- The scheduler is in the other
coroutineContext
“, so you can do interception while executing coroutines.
Similarly, we can use coroutineContext[Job] to get the current coroutine.
Step 6 resumeWith
Back to:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// Wrap it with Coroutine
createCoroutineUnintercepted(receiver, completion)
// Do interception if necessary
.intercepted()
// Call resumeWith
.resumeCancellableWith(Result.success(Unit))}Copy the code
Continue#resumeCancellableWith() is an extension with the scheduling logic:
DispatchContinuation#resumeCancellableWith
CoroutineDispatcher#dispatch
Continuation#resumeWith
The Continuation here is SuspendLambda, which inherits the BaseContinuationImpl. Let’s see how it works:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any? >) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
valoutcome: Result<Any? > =try {
1. Execute the code block from suspend
val outcome = invokeSuspend(param)
// 2. If a block of code executes a suspend method, it returns early
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 3. If completion is also BaseContinuationImpl, the loop will be entered
current = completion
param = outcome
} else {
// 4. Run the completion resumeWith method
completion.resumeWith(outcome)
return
}
}
}
}
}
Copy the code
This side is divided into two parts:
- perform
suspend
Method and get the results - call
complete
(In the next step)
Executing the suspend method
The suspend method content is executed in the first place, and the suspend method may be scheduled inside the method, as in our instance method:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1.2)}val b = async { getResult(3.5)}val c = a.await() + b.await()
Log.e(TAG, "result:$c")}suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}
Copy the code
Since we performed a delay operation on getResult, our launch method must have performed a time-dependent suspend method, so basecontinuationIMPLI #invokeSuspend will return a COROUTINE_SUSPENDED, As a result, as you can see, the method ends prematurely. (For clarification, I can’t find an implementation of the BaseContinuationImpl#invokeSuspend method. I guess it’s the compiler.)
I’m sure you’re just as curious as I am about how a time-consuming suspend can be used to restore complete.
Let’s take a look at delay(1000) on the main thread:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
internal class HandlerContext private constructor(
private val handler: Handler,
private valname: String? .private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/ /...
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
/ /...
}
Copy the code
As you can see, package the restore task with a Runnable and hand it off to Handler’s Handler#postDelayed() method.
Step 7 complete resumeWith
There are generally two ways to handle a complete.
Complete is BaseContinuationImpl
The first case is what we call a nesting doll, a completed callback Continuation that has its own completed callback Continuation, and then the loop is right.
Call complete’s resumeWith
In the second case, the callback is completed with complete. Since complete is AbstractContinuation, let’s look at its resumeWith:
public abstract class AbstractCoroutine<in T>(
/** * The context of the parent coroutine. */
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ...
public final override fun resumeWith(result: Result<T>) {
// 1. Obtain the technical status of the current coroutine
val state = makeCompletingOnce(result.toState())
// 2. If you are currently waiting for completion, there are subcoroutines that have not finished
if (state === COMPLETING_WAITING_CHILDREN) return
// 3. Execute the end restoration method. The default value is null
afterResume(state)
}
// This is the makeCompletingOnce method in the parent JobSupport class
// I'll copy it for you
internal fun makeCompletingOnce(proposedUpdate: Any?).: Any? {
loopOnState { state ->
// tryMakeCompleting is handled differently depending on whether there are child jobs
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state}}}}Copy the code
The coroutine is about to complete, so we need to evaluate the technical status of the coroutine before it runs. Some coroutines with subcoroutines will wait for the subcoroutine to end before terminating the current coroutine.
This is the process of a launch. The rough flow chart looks like this:
Let’s talk more about async.
Iv. About Async
Async and launch codes are very similar:
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope. () - >T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
Copy the code
Finally, there are three steps:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// Wrap it with Coroutine
createCoroutineUnintercepted(receiver, completion)
// Do interception if necessary
.intercepted()
// Call resumeWith
.resumeCancellableWith(Result.success(Unit))}Copy the code
Instead of async returning a Deferred
, we need to call Deferred#await() to get the result, which is implemented in JobSupport:
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
/ /... The awaitInternal method comes from the parent class JobSupport
override suspend fun await(a): T = awaitInternal() as T
// ...
// This is implemented in JobSupport
internal suspend fun awaitInternal(a): Any? {
// loop the result
while (true) { // lock-free loop on state
val state = this.state
// 1
if (state !is Incomplete) {
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()
}
// 2. Break unless you need to retry
if (startInternal(state) >= 0) break
}
// Wait for the hang method
return awaitSuspend() // slow-path}}Copy the code
Its specific process can be seen from my notes, not an introduction, interested students can view the source code.
1. Discussion at the beginning of this article
The initial code of this article is wrong, even the compiler can not pass, embarrassing ~
The correct code should be:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a.await() + bawait()
Log.e(TAG,"result:$c")}Copy the code
If the code is correct, there are two possible cases:
If you put it on the UI thread, it’s going to be serial, and somebody says, well, I used delay(1000) in A, and DELAY (2000) in B, and it took me 2000 milliseconds to get TO C. Isn’t that parallel? This is not the case. The delay operation uses the Handler#postDelay method. One is 1000 milliseconds late and the other is 2000 milliseconds late, but there is only one main thread, so it can only be serial.
If it’s a child thread, it’s usually parallel because we’re using a thread pool
conclusion
When writing this source code analysis, some details can not always be found, such as suspendLambda subclass can not be found, their study of Kotlin needs to be in-depth.
Therefore, some parts of this article are still worth discussing. If you have a better understanding, welcome to communicate with us below.
Some time ago, I had a slight illness, and I went to the hospital every week, so I was very sad, and the efficiency of writing the blog was also very low. I wrote this article for nearly a month, ah, it was very difficult ~, but I returned to normal from this week.
Article Reference:
Cracking Kotlin Coroutines (3) – Coroutine Scheduling