Kotlin Coroutines are fully resolved
Most comprehensive Kotlin Coroutine: Coroutine/Channel/Flow and practical applications
Hardcore swastika interpretation: Kotlin coroutine principle analysis
Excellent design of Kotlin coroutine-Scheduler
I met coroutines
What is a coroutine
Kotlin 1.3 adds the concept of Coroutine, which is described as a concurrent design pattern that can be used on the Android platform to simplify asynchronous execution of code.
Coroutines have the following characteristics:
-
Asynchronous code synchronization: Write asynchronous code the same way you write synchronous code.
-
Lightweight: You can run multiple coroutines on a single thread because coroutines support suspension and do not block the thread running the coroutine. Suspending saves memory than blocking and supports multiple parallel operations.
-
Less memory leaks: Use structured concurrency mechanisms to perform multiple operations within a scope.
-
Built-in cancellation support: Cancellation operations are automatically propagated throughout the running coroutine hierarchy.
-
Jetpack integration: Many Jetpack libraries include extensions that provide full coroutine support. Some libraries also provide their own coroutine scope that you can use for structured concurrency.
Suspend and resume coroutines
Suspending and resuming Kotlin coroutines are essentially suspending and resuming suspended functions.
suspend fun suspendFun() {}
Copy the code
Suspended function: An ordinary function modified by the suspend keyword. If a suspended function is called inside a coroutine, the point at which the call was made is called the starting point. Suspension Point If an asynchronous call occurs, the current coroutine is suspended until the corresponding continuation.resume () function is called.
The difference between a suspended function and a normal function is:
-
A suspended function can only be called inside a coroutine or another suspended function;
-
A suspended function can call any function; a normal function can only call a normal function.
Suspend can be used to decorate lambda expressions in addition to functions, and the differences are discussed in detail in the section on source code Analysis.
Basic usage
Gradle introduced
Dependencies {/ / Kotlin Coroutines implementation 'org. Jetbrains. Kotlinx: kotlinx Coroutines -- core: 1.4.2' / / use ` Dispatchers. The Main ` need to add the following dependent on implementation 'org. Jetbrains. Kotlinx: kotlinx coroutines - android: 1.4.2'}Copy the code
Start the coroutines
The Kotlin coroutine framework gives us two convenient ways to start coroutines:
-
GlobalScop.launch
-
GlobalScope.async
Output Hello World! :
Fun main() {globalscope.launch {// Use globalscope.launch to launch coroutine delay(1000L) // Non-blocking wait 1 second (default time unit is milliseconds) println("World!") } print("Hello ") {print("Hello ")} Thread. Sleep (2000L) { Globalscope.async {// Start coroutine delay(1000L) println("World!") ) } print("Hello ") Thread.sleep(2000L) }Copy the code
From the example above, it seems that there is no difference between the two methods, but what is the difference between their return value
-
Globalscop. launch: Returns the value Job
-
Globalscope. async: Return value Deferred
Deferred
is a subclass of Job and can get the return value of the coroutine by calling the await function. The GlobalScope. Async example above is modified:
Globalscope.launch {val result = globalscope.async {// Use globalscope.async to start coroutine delay(1000L) "World!" } println("Hello ${result.await()}")} thread.sleep (2000L)Copy the code
The above example has async nested inside the launch body because the await is a pending function, and what distinguishes a pending function from a normal function is that it must be called inside the body of a coroutine or other pending function.
In the coroutine body ({}) you can hide GlobalScope and launch the coroutine directly using async and launch, so the above example can be modified as follows:
Globalscope.launch {val result = async {// Use globalscope.async to start coroutine... }... // launch {} } ...Copy the code
Coroutines operation
Globalscop.launch and GlobalScop.async return values that are Job objects or subclasses of them. What is a Job? And what functions.
A Job is a cancellable background task that manipulates the execution of a coroutine and records its state during execution, so generally a Job instance also represents a coroutine.
A Job has the following states:
| State | [isActive] | [isCompleted] | [isCancelled] |
| ——————— | ——– | ———- | ———— |
| New initial state (optional) | | | | false false false
| | Active (default initial state) true | | | false false
| | Completing (transient) true | | | false false
| Cancelling (transient) | | | false false true |
| Cancelled (final) | | false true | true |
| | | Completed (final state) false true | | false
Generally, a Job is automatically started when it is created and its status is _Active_ by default. However, if coroutinestart. Lazy is specified, the Job status is _NEW_ and can be activated by functions such as start() or join().
Job status flow chart:
wait children
+-----+ start +--------+ complete +-------------+ finish +-----------+
| New | -----> | Active | ---------> | Completing | -------> | Completed |
+-----+ +--------+ +-------------+ +-----------+
| cancel / fail |
| +----------------+
| |
V V
+------------+ finish +-----------+
| Cancelling | --------------------------------> | Cancelled |
+------------+ +-----------+
Copy the code
Possible methods for Job:
-
Cancel (CancellationException) : Cancels the coroutine corresponding to the Job and sends a coroutine cancellation error (CancellationException).
-
InvokeOnCompletion () : Registers the handler that is invoked synchronously when the Job status updates to Completed.
-
Join () : suspends the coroutine corresponding to the Job. When the coroutine completes, the outer coroutine resumes.
-
Start () : If the Job object is created using coroutinestart. Lazy, it is used to start coroutines.
-
CancelAndJoin () : Cancel the Job and suspend the current coroutine until the Job is cancelled.
To cancel a running coroutine:
val job = launch { repeat(1000) { i -> println("job: I'm sleeping $i ..." ) delay(500L)}} delay(1300L) println("main: I'm tired of waiting!" ) job.cancel() // Cancel the job job.join() // Wait for the job to finish println("main: Now I can quit.") // Output job: I'm sleeping 0... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.Copy the code
The cancelAndJoin function can be used in the above example, which combines calls to cancel and join.
Note: A coroutine cannot be cancelled if there is no hanging point during its execution.
val startTime = System.currentTimeMillis() val job = launch(Dispatchers.Default) { var nextPrintTime = startTime var i = If (system.currentTimemillis () >= nextPrintTime) {println("job: "); I'm sleeping ${i++} ..." ) nextPrintTime += 500L}}} delay(1300L) // Make sure the coroutine starts to execute println("main: I'm tired of waiting!") ) job.cancelAndJoin() println("main: Now I can quit.") // Output job: I'm sleeping 0... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! job: I'm sleeping 3 ... job: I'm sleeping 4 ... main: Now I can quit.Copy the code
Simply put, a coroutine that has already been executed cannot be cancelled if there is no hanging point in the coroutine.
The following describes the meaning and function of the reference when coroutine is started:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
Copy the code
The starting mode of the coroutine
CoroutineStart: Coroutine startup mode. Four startup modes are provided within the coroutine:
-
DEFAULT: Scheduling starts immediately after the coroutine is created. If the coroutine is cancelled before the scheduling, it will enter the cancellation state directly.
-
ATOMIC: Scheduling begins immediately after coroutine creation. Coroutine execution does not respond to cancellation until the first hanging point.
-
LAZY: scheduling starts only when the coroutine is needed, including active calls to the coroutine’s start(), join(), await() and other functions. If cancelled before the scheduling, the coroutine goes directly to the exception end state.
-
UNDISPATCHED: Executes immediately after the coroutine is created until the first true hang point is encountered.
The difference between immediate scheduling and immediate execution: Immediate scheduling means that the coroutine’s scheduler receives the dispatch instruction immediately, but the exact timing of execution, and on which thread, depends on the scheduler, meaning that there is usually a period of time between immediate scheduling and immediate execution. Therefore, we come to the following conclusions:
-
Although DEFAULT is scheduled immediately, it may be cancelled before execution.
-
UNDISPATCHED is executed immediately, so coroutines must be executed.
-
ATOMIC is immediate scheduling, but it combines the scheduling and execution steps into one. As its name suggests, it guarantees that the scheduling and execution are ATOMIC operations, so the coroutine must execute as well.
-
Both UNDISPATCHED and ATOMIC guarantee execution of the coroutine. The former runs on the thread on which the coroutine was created until the first limit point. The latter is dispatched to the specified scheduler thread.
Coroutine context and scheduler
CoroutineContext: CoroutineContext. To control the behavior of coroutines, the Job mentioned above and the scheduler I’m going to introduce belong to CoroutineContext.
Coroutines provide four types of schedulers by default:
-
Dispatchers.Default: The Default scheduler, which is used by Default to build coroutines if a coroutine scheduler and any other interceptors are not specified. Suitable for processing background computing, it is a CPU-intensive task scheduler.
-
Dispatchers.IO: IO scheduler, suitable for performing IO related operations, is an IO intensive task scheduler.
-
Dispatchers.Main: UI scheduler that dispatches coroutines to the Main thread for execution.
-
Dispatchers.Unconfined: a dispatcher that does not require coroutines to execute on a particular thread. If the coroutine scheduler is Unconfined, it will execute directly on the thread where the recovery occurs at the end of the hang point. Of course, if the coroutine is nested and is created as a scheduler, the coroutine will be scheduled to the time loop within the coroutine framework at launch to avoid StackOverflow.
-
Dispatchers.Unconfined: A dispatcher that starts a coroutine on the thread that calls it, but only runs to the first hanging point. After suspension, it resumes coroutines in the thread, depending entirely on the suspended function that was called.
Println ("main runBlocking: ") println("main runBlocking: I'm working in thread ${thread.currentThread ().name}")} launch(Dispatchers.unconfined) println("Unconfined : I'm working in thread ${thread.currentThread ().name}")} launch(Dispatchers.Default) { | | | | | | | | | | | | | | | | | | | | | | | I'm working in thread main @coroutine#3 Default : I'm working in thread DefaultDispatcher-worker-1 @coroutine#4 main runBlocking : I'm working in thread main @coroutine#2Copy the code
withContext
In addition to setting the coroutine scheduler when the coroutine is created by globalScope.launch {}, globalScope.async {},
With the async {… }.await() has a lower memory overhead than withContext, so in cases where async is immediately followed by await calls, withContext should be used first.
withTimeout
The Kotlin coroutine provides a withTimeout function to cancel the timeout. If you’re running a timeout, cancel after sell TimeoutCancellationException anomalies. If an exception is thrown that affects other coroutines, you can use the withTimeoutOrNull function, which returns null in the case of a timeout without throwing an exception.
runBlocking { val result = withContext(coroutineContext) { withTimeoutOrNull(500) { delay(1000) "hello" } } Println (result)} // Output the result helloCopy the code
yield
If you want to solve the problem in the example above, you can use the yield function. It checks the state of the coroutine and responds by throwing a cancellation exception if it has been canceled. In addition, it tries to give the thread the right to execute, giving other coroutines the chance to execute.
Add yield to the example above:
if (System.currentTimeMillis() >= nextPrintTime) { yield() println("job: I'm sleeping ${i++} ..." ) nextPrintTime += 500L} job: I'm sleeping 0... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.Copy the code
The scope of a coroutine
Coroutine scope: Coroutine scope is used to specify the parent-child relationship between coroutines and the behavior of propagation for cancellation or exception handling.
Coroutine scopes include the following three types:
-
Top-level scope: The scope in which a coroutine has no parent coroutine is the top-level scope.
-
Cooperative scope: a coroutine starts a new coroutine that is a subcoroutine of the existing coroutine. In this case, the scope in which the subcoroutine is located defaults to the cooperative scope. The uncaught exception thrown by the child coroutine is passed to the parent coroutine, and the parent coroutine is cancelled.
-
Master-slave scope: the same as coroutine scope in the parent-child relationship of coroutines, except that a coroutine under this scope does not pass an uncaught exception up to the parent.
The relationship between a parent-child coroutine:
-
When a parent coroutine is canceled, all child coroutines are canceled.
-
The parent coroutine waits for the child coroutine to complete before finally entering the complete state, regardless of whether its own coroutine body has completed execution.
-
A subcoroutine inherits the coroutine context element from its parent and overwrites the corresponding key if it has members of the same key. The effect of the override is limited to its own scope.
Declare a top-level scope: globalscope.launch {}, runBlocking {}
Declare a synergy scope: coroutineScope {}
SupervisorScope {supervisorScope}
CoroutineScope {} and supervisorScope {} are suspended functions so they can only be called in coroutineScope or suspended functions.
The supervisorcoroutinescope {} supervisorScope {} is different in format when it overwrites childCancelled() so that exceptions are not passed to the parent coroutine.
Coroutines concurrent
From the above introduction, we can understand that coroutines are actually snippets of code executed on threads, so concurrent processing of threads can be used in coroutines, such as synchorinzed, CAS, etc. Coroutines themselves provide two ways to handle concurrency:
-
Mutex: a Mutex lock;
-
A Semaphore.
Mutex
Mutex is similar to Synchorinzed, wrapping coroutines as lockwaiters when coroutines compete using bidirectional linked list storage. Mutex also provides the withLock extension function to simplify usage:
runBlocking<Unit> { val mutex = Mutex() var counter = 0 repeat(10000) { GlobalScope.launch { mutex.withLock { counter ++ Println ("The final count is $counter")}Copy the code
Semaphore
Semaphore is used to limit the number of coroutines that can access a particular resource.
runBlocking<Unit> { val semaphore = Semaphore(1) var counter = 0 repeat(10000) { GlobalScope.launch { Println ("The final count is $counter")} semaphore. WithPermit {counter ++}} thread.sleep (500) println("The final count is $counter")}Copy the code
Permits = 1 note: Permits = 1 will only function as Mutex.
Source code analysis
suspend
Let’s look at the difference between a suspend modifier and a modified lambda.
Suspend function:
suspend fun suspendFun() {
}
Copy the code
It is compiled into Java code as follows:
@Nullable
public final Object suspendFun(@NotNull Continuation $completion) {
return Unit.INSTANCE;
}
Copy the code
You can see that the suspended function hides a Continuation coroutine instance argument that comes from the body of the coroutine or another suspended function, so the suspended function can only be called inside the body of the coroutine or another function.
Suspend modifies a lambda expression:
Function1 var2 = (Function1)(new Function1((Continuation)null) {int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { switch(this.label) { case 0: return Unit.INSTANCE; default: } } @NotNull public final Continuation create(@NotNull Continuation completion) { Function1 var2 = new <anonymous constructor>(completion); return var2; } public final Object invoke(Object var1) { return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE); }});Copy the code
Suspend lambda actually compiles into subclasses of SuspendLambda. The inheritance relationship of suspendLambda is as follows:
Decompilating the code shows that the code we wrote inside the coroutine is ultimately executed inside the invokeSuspend function. The BaseContinuationImpl implements the resumeWidth function of the Continuation coroutine interface and calls the invokeSuspend function inside it.
With that introduction to the suspend keyword, let’s look at how coroutines are created and run.
How are coroutines created
File address kotlin. Coroutines. Continuation. Kt.
Continuation. Kt file basically belongs to the core of coroutine foundation, understand it is equivalent to understand the basic principles of coroutine.
-
Definition of coroutine interface;
-
A function that wakes or starts a coroutine;
-
Four kinds of functions to create coroutines;
-
A function that helps get a coroutine instance object within a coroutine.
The first is the coroutine’s interface declaration, which is very simple:
/** * public val () {/** * public val () {/** * public val () {/** * public val () {/** * public val () {/** * public val (); CoroutineContext /** * public fun resumeWith(result: result <T>)} public fun resumeWith(result: result <T>)}Copy the code
The Continuation. Kt file provides two functions that call the resumeWith function after the coroutine interface declaration:
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
Copy the code
These two functions do exactly the same thing, except that they pass a successful pass and fail. They both call the resumeWith function directly. This encapsulates the resumeWith function.
Then there are the four ways to create coroutines:
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}
Copy the code
The main difference between a createCoroutine and a startCoroutine is that a coroutine created through a createCoroutine needs to be started without the resume function. The startCoroutine function already calls resume by default. Let’s create a coroutine in the first way:
Continuation = suspend {println("In Coroutine")}.createecoroutine (object: Continuation<Unit> { override fun resumeWith(result: Result<Unit>) {println(Result)} override val context = EmptyCoroutineContext})Copy the code
The anonymous class object of the Continuation coroutine passed to the createCoroutine function to create the coroutine. It seems a little wrong, why do you pass an instance of the coroutine when you create it, just use it. To see why, look at what the createCoroutine does.
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
Copy the code
The first call is createCoroutineUnintercepted function, its source can be in kotlin. Coroutines. Intrinsics. IntrinsicsJvm. Kt find:
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any? >).invoke(it) } }Copy the code
The probeCoroutineCreated function returns the argument directly, and by way of a breakpoint, it returns the same value as the completion passed, so ignore it here.
The return value (this is BaseContinuationImpl) is found to be true by the breakpoint, which indirectly demonstrates the inheritance relationship between suspend lambda and BaseContinuationImpl described above. Finally, what is returned is the return value of the create(Continuation) function. You can see here that the Continuation variable passed in as an argument is wrapped in the suspend lambda and returns, In effect, Suspend lambda becomes the agent for continuations.
Here createCoroutineUnintercepted (completion) can understand the meaning of the:
Pass the coroutine instance created by object: Continuation
{} into the Suspend lambda, whose agent coroutine performs the operation.
The intercepted function is then called and the intercepted function is also declared in the intrinsicsjvm.kt file:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
Copy the code
ContinuationImpl’s intercepted function:
public fun intercepted(): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this) .also { intercepted = it }Copy the code
The context [ContinuationInterceptor]? The.InterceptContinuation (this) code addresses the concept of a coroutine interceptor, which is discussed in detail later. Here we can briefly introduce the coroutine interceptor and coroutine is actually a proxy. So intercepted() can be understood as returning a coroutine interceptor if it is added to the coroutine context, or returning the suspend lambda instances themselves, both of which implement the Continuation interface.
The createCoroutine and startCoroutine functions are not used to create coroutines. Coroutine instances are passed to them. They are used to add agents to coroutines.
createCoroutineUnintercepted(completion).intercepted()
Copy the code
With the code above, you add agents for coroutines, namely suspend lambda and coroutine interceptor. A call to resumeWith through an instance of the coroutine will first execute the resumeWith logic implemented in the two layers of the proxy, and then finally execute the resumeWith function of the coroutine to produce the final result.
Within the createCoroutine function, a second layer of agents, SafeContinuation, is added after adding the two layers. SafeContinuation uses three coroutine states inside, together with the CAS operation, to ensure that the currently returned SafeContinuation instance object can only call the resumeWith function once, multiple calls will report an error.
-
UNDECIDED: initial state
-
COROUTINE_SUSPENDED: suspended state
-
RESUMED: RESUMED
How is the coroutine suspended and how is it resumed
So why do coroutines do that? It’s a hassle, isn’t it? To understand this, let’s look at the BaseContinuationImpl’s implementation of the resumeWith function.
public final override fun resumeWith(result: Result<Any? >) { var current = this var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! val outcome: Result<Any? > = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }Copy the code
When resume(Unit) is called to start the coroutine, the invokeSuspend() function is executed inside the BaseContinuationImpl’s resumeWith() function because of the agent, which is what we call the body of the coroutine.
Look at the invokeSuspend function in the following code:
Public final Object invokeSuspend(@notnull Object $result) {Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); return Boxing.boxInt(5); default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); }}Copy the code
Can see here return the final results may directly, then in ContinuationImpl. ResumeWith function final call
completion.resumeWith(outcome)
Copy the code
Output the final result of the coroutine.
ContinuationImpl does not seem to play any role in this process. Let’s look at the asynchronous code executing inside the coroutine:
suspend { suspendFunc() } suspend fun suspendFunc() = suspendCoroutine<Int> { continuation -> thread { }} public final Object invokeSuspend(@notnull Object $result) { Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); Object var10000; switch(this.label) { case 0: ResultKt.throwOnFailure($result); this.label = 1; var10000 = DeepKotlin3Kt.suspendFunc(this); if (var10000 == var2) { return var2; } break; case 1: ResultKt.throwOnFailure($result); var10000 = $result; break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return var10000; } public static final Object suspendFunc(@NotNull Continuation $completion) { boolean var1 = false; boolean var2 = false; boolean var3 = false; SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion)); Continuation continuation = (Continuation)var4; int var6 = false; ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, (Function0)(new DeepKotlin3Kt$suspendFunc02$2$1(continuation)), 31, (Object)null); Object var10000 = var4.getOrThrow(); if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) { DebugProbesKt.probeCoroutineSuspended($completion); } return var10000; }Copy the code
The resume function starts the coroutine, the invokeSuspend function executes this. Label == 0 for the first time and executes the case 0 code, If (var10000 == VAR2) is true then invokeSuspend returns VAR2, which is COROUTINE_SUSPENDED. Within the resumeWith function, determine if the invokeSuspend function returns a COROUTINE_SUSPENDED value, then reture is true. This is the coroutine suspend process.
When the thread terminates, resume is called and the invokeSuspend function is executed again. This. Label == 1, case 1 is executed and the result 5 is returned directly. In the resumeWith function, the return is not executed, and the coroutine’s resumeWith function is called to produce the final result. This is the coroutine’s recovery process.
By understanding the coroutine running process, ContinuationImpl is actually the real implementer of the coroutine suspend and resume logic. Because coroutine suspend and restore logic exists, we can call asynchronous code just as we would write synchronous code:
suspend { println("Coroutine start") println("Coroutine: ${System.currentTimeMillis()}") val resultFun = suspendThreadFun() println("Coroutine: suspendThreadFun-$resultFun-${System.currentTimeMillis()}") val result = suspendNoThreadFun() println("Coroutine: suspendNoThreadFun-$result-${System.currentTimeMillis()}") }.startCoroutine(object : Continuation<Unit> { override val context = EmptyCoroutineContext override fun resumeWith(result: Result<Unit>) { println("Coroutine End: $result") } }) suspend fun suspendThreadFun() = suspendCoroutine<Int> { continuation -> thread { Thread.sleep(1000) continuation.resumeWith(Result.success(5)) } } suspend fun suspendNoThreadFun() = suspendCoroutine<Int> { continuation -> continuation. Resume (5)} // Output: Coroutine start Coroutine: 1627014868152 suspendThreadFun-5-1627014869182 Coroutine: suspendNoThreadFun-5-1627014869186 Coroutine End: Success(kotlin.Unit)Copy the code
Create coroutine scope
When creating a coroutine through a createCoroutine, you will notice that you can also pass the receiver parameter to it. This parameter is used to extend the body of the coroutine, commonly referred to as the coroutine scope.
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
Copy the code
You can see that the suspend lambda expression has also changed. We know that () -> T is a lambda expression of Function0, and that R.() -> T corresponds to an extension of () -> T of class R. If you know extension functions that take the extended class as an argument, then R.() -> T is a lambda expression for Function1.
Of course, as a result of the suspend keyword, the Continuation argument is added, so you end up with Function1 and Function2.
The function or property of the Receiver can be called from within the coroutine with this (hidden) because the function is extended. The following is an example:
launchCoroutine(ProducerScope<Int>()) {
produce(1000)
}
fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
block.startCoroutine(receiver, object : Continuation<T> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<T>) {
println("Coroutine End: $result")
}
})
}
class ProducerScope<T> {
fun produce(value: T) {
println(value)
}
}
Copy the code
Globalscope.launch source analysis
Analyzing globalScope.launch is easy once you understand the logic for creating coroutines above. Globalscope. launch will eventually execute the Coroutinestart. invoke function:
AbstractCoroutine.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
Copy the code
CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
Copy the code
The code is basically consistent with the analysis.
Custom coroutine context
Coroutine context plays a very important role in coroutines, as it does in coroutines with equipment card slots. You can merge the context object you want to add to the CoroutineContext parameter and use it elsewhere.
The CoroutineContext data structure has the following characteristics:
-
Any coroutine context object can be accessed by [] in a list-like manner, with the target coroutine context inside [].
-
Coroutine contexts can be added sequentially by +, of course += is also available.
We add a name to the coroutine by defining a coroutine context:
public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {
public companion object Key : CoroutineContext.Key<CoroutineName>
override fun toString(): String = "CoroutineName($name)"
}
Copy the code
Apply to the example:
var coroutineContext: CoroutineContext = EmptyCoroutineContext coroutineContext += CoroutineName("c0-01") suspend { println("Run Coroutine") }.startCoroutine(object : Continuation<Unit> { override fun resumeWith(result: Result<Unit>) { println("${context[CoroutineName]? .name}")} override val context = coroutineContext}) Run Coroutine c0-01Copy the code
In fact, coroutines already give us CoroutineName implementation.
Custom coroutine interceptor
An interceptor is defined by implementing the interceptor interface ContinuationInterceptor. Since an interceptor is also a class implementation of a coroutine context, it is used by adding it to the corresponding coroutine context.
Declare a log interceptor:
class LogInterceptor : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = LogContinuation(continuation)
}
class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("before resumeWith: $result")
continuation.resumeWith(result)
println("after resumeWith")
}
}
Copy the code
The key interceptor function is interceptContinuation, which returns a new Continuation instance as needed.
Interceptors are triggered each time a recovery call is made during the coroutine’s life cycle. There are two cases of recovery calls:
-
Called once when the coroutine is started, it begins executing the logic of the coroutine body from start to next suspend by resuming the call.
-
If the suspension is asynchronous at the suspension point, it is called again at recovery time.
Thus, the number of resume calls is n+1, where n is the number of start points in the coroutine where the asynchronous logic is actually suspended.
To rewrite the example above:
Suspend fun suspendFunc02() = suspendCoroutine<Int> {continuation -> thread { Continuation.resumewith (result.success (5))}} continuation.resumewith (result.success (5))}} }.startCoroutine(object : Continuation<Int> { override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith(result: Result<Int>) { ... result.onSuccess { println("Coroutine End: ${context[CoroutineName]? .name}, $result")}}}); Suspend {suspendFunc02() suspendFunc02()}.startCoroutine(object: Continuation<Int> { override val context: CoroutineContext = LogInterceptor() override fun resumeWith(result: Result<Int>) { ... result.onSuccess { println("Coroutine End: ${context[CoroutineName]? .name}, $result")}}}) // Output: before resumeWith: Success(kotlin.unit) after resumeWith before resumeWith: Success(5) after resumeWith before resumeWith: Success(5) after resumeWithCopy the code