Kt. academy/article/cc-…
This is from a chapter in Kotlin Coroutines’ book, which can be found in LeanPub
There are people who don’t just accept that cars work. They need to open the hood and try to understand how it works under the hood. And I’m one of those people, so I need to figure out how coroutines work. If you’re like me, you’ll enjoy this chapter, and if you’re not, you can skip it.
This chapter will not introduce you to new tools, just an explanation. An attempt will be made to explain to a satisfactory degree how coroutines work. The key lessons are:
- A suspended method is like a state machine, with a possible state data at the beginning of method execution and after each suspended function
- The numbers and local data used to represent the state are kept in
Continuation
Represents the subsequent execution process object - Represents a method
Continuation
Embellished by another, the result is allContinuation
Represents a call stack that will be used during recovery
If you are interested in some internal (simplified) principles, come with me.
Subsequent delivery style
Continuation-passing style
There are several ways to declare suspend functions, but Kotlin’s team decided to use the follow-on style. This means that subsequent procedures are passed between methods as parameters
By convention, the parameter representing the subsequent procedure is the last one in the method declaration
suspend fun getUser(a): User?
suspend fun setUser(user: User)
suspend fun checkAvailability(flight: Flight): Boolean
// It will actually look like this:
fun getUser(continuation: Continuation< * >): Any?
fun setUser(user: User, continuation: Continuation< * >): Any
fun checkAvailability(
flight: Flight,
continuation: Continuation< * >): Any
Copy the code
You should have noticed that the return type of the actual method is not the same as the original declaration. All types become Any, and the nullable ones become Any, okay? . Why is that? The reason is that suspend functions can be suspended, so they may not return a life type. In this case, what it returns is actually a special COROUTINE_SUSPENDED tag. You will see this later in the contact.
Now you just have to pay attention, because getUser might return User, right? Or COROUTINE_SUSPENDED (type Any), so his result must be User? And Any, Any? .
Maybe one day, Kotlin will introduce a collection type, so we can write User here, right? The | COROUTINE_SUSPENDED
The easy way
Dig a little deeper and take a look at the following example, which has a simple print before and after a delay:
suspend fun myFunction(a) {
println("Before")
delay(1000) / / hang
println("After")}Copy the code
You can already infer that the real method signature would be:
fun myFunction(continuation: Continuation< * >): Any
Copy the code
The method then needs its own “follow-up” to remember its state data. Let’s call it MyFunctionContinuation (the “follow-up” is actually an object expansion that doesn’t have a name, but it’s just a way to explain it later). At the beginning of this method, myFunction wraps the continuation on the argument with MyFunctionContinuation:
val continuation = MyFunctionContinuation(continuation)
Copy the code
Do this only if a continuation has not been wrapped. Once it’s wrapped, it’s part of the recovery process, and we should leave the continuation as it is (this may sound confusing at the moment, but you’ll see why later) :
val continuation =
if (continuation is MyFunctionContinuation) continuation
else MyFunctionContinuation(continuation)
Copy the code
It can be simplified as:
val continuation = continuation as? MyFunctionContinuation ? : MyFunctionContinuation(continuation)Copy the code
Finally, let’s talk about the content of the method:
suspend fun myFunction(a) {
println("Before")
delay(1000) / / hang
println("After")}Copy the code
This approach may begin in two ways:
- That itself is the first call
- Recovery from acura hanging point
To mark the current state, we declare a variable called label, which is 0 at the beginning, and then the method is executed at the beginning. Before each suspension point, it is assigned to the next state, so after resuming we can start right after the suspension point:
// A simple demonstration of how myFunction works behind the scenes
fun myFunction(continuation: Continuation<Unit>): Any {
val continuation = continuation as? MyFunctionContinuation ? : MyFunctionContinuation(continuation)if (continuation.label == 0) {
println("Before")
continuation.label = 1
if (delay(1000, continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
println("After")
return Unit
}
error("Impossible")}Copy the code
The last important point is shown in the code snippet above, which is that when the delay method is suspended, this function directly returns COROUTINE_SUSPENDED, and the same thing can happen to the functions that call this function up the hierarchy to the top of the call stack. This is how a suspended node terminates all functions, and then the thread can do something else.
Before we continue, let’s examine the above code. What happens if a call to Delay returns COROUTINE_SUSPENDED? What if Unit is returned (unlikely, but hypothetically)?
If delay returns Unit (that is, nothing), it will work just like any other function and then move to the next state (label is assigned).
The declaration of MyFunctionContinuation is actually ephemeral and inherits from ContinuationImpl:
class MyFunctionContinuation(continuation: Continuation<*>) : ContinuationImpl(continuation) {
var result: Any? = null // In this scenario, the return value is useless. It is a Unit
var label = 0
override fun invokeSuspend($result$: Any?).: Any? {
this.result = $result;
return myFunction(this); }};Copy the code
As you may have noticed, our continuation inherits from ContinuationImpl. This class, and its superclasses, are responsible for the recovery process. Their panorama is quite complex, and it gets simplified over time, but for now we just need to do a minimal continuation for our simple method (so we’ll inherit from ContinuationImpl instead of ContinuationImpl). In this simplified version, we assume:
continuation
We just need a state to say where do we hang, passlabel
- The incoming
resume
The value of is not importantUnit
) - When an exception occurs
continuation
Will not be restored
The following code snippet you can run and analyze will work. Keep in mind that there are a lot of simplifications. Real continuations have more logic to consider (most of which we’ll explain later), Delay wraps continuations in its own class line, and we start them in a similar way, using the coroutine constructor:
fun myFunction(continuation: Continuation<Unit>): Any {
val continuation = continuation as? MyFunctionContinuation ? : MyFunctionContinuation(continuation)if (continuation.label == 0) {
println("Before")
continuation.label = 1
if (delay(1000, continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
println("After")
return Unit
}
error("Impossible")}class MyFunctionContinuation(val completion: Continuation<Unit>) : Continuation<Unit> {
override val context: CoroutineContext
get() = completion.context
var label = 0
override fun resumeWith(result: Result<Unit>) {
if (result.isSuccess) {
val res = myFunction(this)
completion.resume(res as Unit)}/ /... The rest will come later}}Copy the code
The above code is a simple description of what happens in the following code:
//sampleStart
suspend fun myFunction(a) {
println("Before")
delay(1000)
println("After")}fun main(a) {
val EMPTY_CONTINUATION = object : Continuation<Unit> {
override val context: CoroutineContext = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
// no-op}}suspend { myFunction() }.startCoroutine(EMPTY_CONTINUATION)
Thread.sleep(2000)}Copy the code
To assign the above code to IIntellJ IDEA, use the Decompile button of Tools > Kotlin > Show Kotlin Bytecode. You’ll see them decompiled into Java code (see what the code would look like if it were written in Java)
Methods with state
If a method has states (such as local variables) that need to be restored after suspension, those states need to be held in its continuations. For example:
suspend fun myFunction(a) {
println("Before")
val counter = 0
delay(1000) / / hang
counter++
println("Counter: $counter")
println("After")}Copy the code
Before you suspend, you need to store the state in a continuation, which you can then recover from. Here’s how the method works:
fun myFunction(continuation: Continuation<Unit>): Any {
val continuation = continuation as? MyFunctionContinuation ? : MyFunctionContinuation(continuation)var counter = continuation.counter / / initialization
if (continuation.label == 0) {
println("Before")
counter = 0
continuation.counter = counter / / store
continuation.label = 1
if (delay(1000, continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
counter = (counter as Int) + 1 / / consumption
println("Counter: $counter")
println("After")
return Unit
}
error("Impossible")}class MyFunctionContinuation(
val completion: Continuation<Unit>
) : Continuation<Unit> {
override val context: CoroutineContext // Ignore this variable for now
get() = completion.context
var label = 0
var counter: Int? = null // Used to store the old method's state data counter
override fun resumeWith(result: Result<Unit>) {
if (result.isSuccess) {
val res = myFunction(this)
completion.resume(res as Unit)}/ /... More on that later}}Copy the code
Method of value recovery
This is a slightly different scenario: if we do need some data in a hang. Let’s analyze the following methods:
suspend fun printUser(token: String) {
println("Before")
val userId = getUserId(token) // suspending
println("Got userId: $userId")
val userName = getUserName(userId) // suspending
println(User(userId, userName))
println("After")}Copy the code
There are two suspend functions:
getUserId
getUserName
We add an input parameter, and our suspend function returns some values. These values need to be stored in continuations:
userId
Because it is required by other states after recoveryresult
This variable does not exist in the function, but represents the return value of the pending function (the user ID in the initial state, and the user name in the second state)token
, it is incontinuation
Is needed because inprintUser
It is required when the method is called
Here’s what it looks like:
fun printUser(
token: String,
continuation: Continuation<Nothing>): Any {
val continuation = continuation as? MyFunctionContinuation ? : MyFunctionContinuation(continuationas Continuation<Unit>, token)
var result: Any? = continuation.result
var userId: String? = continuation.userId
val userName: String
if (continuation.label == 0) {
println("Before")
continuation.label = 1
result = getUserId(token, continuation)
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 1) {
userId = result as String
println("Got userId: $userId")
continuation.label = 2
continuation.userId = userId
result = getUserName(userId, continuation)
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
if (continuation.label == 2) {
userName = result as String
println(User(userId as String, userName))
println("After")
return Unit
}
error("Impossible")}class MyFunctionContinuation(val completion: Continuation<Unit>, val token: String) : Continuation<String> {
override val context: CoroutineContext // Forget about this variable
get() = completion.context
var label = 0
var result: Any? = null
var userId: String? = null
override fun resumeWith(result: Result<String>) {
if (result.isSuccess) {
this.result = result.getOrNull()
val res = printUser(token, this)
completion.resume(res as Unit)}/ /... This will be discussed later}}Copy the code
abnormal
A continuation can be recovered normally, or it can carry an exception. The second case will occur when an exception is thrown at the hang point. To simulate this scenario, the exception is set into the result, and the result.throwonFailure () method is called for each hang point. The content of this method throws the previously set exception (if any), thanks to which the exception can be caught and the developer can then see the meaningful stack information. In order for this to happen, our result needs to hold both success and failure, so the actual type of result is a generic result
.
In the following demonstration, since I cannot use Result
from the Kotlin library, I use my own declaration instead:
fun printUser(token: String, continuation: Continuation<Nothing>): Any {
val continuation =
if (continuation is MyFunctionContinuation) continuation
else MyFunctionContinuation(continuation as Continuation<Unit>, token)
var result: Result<Any>? = continuation.result
var userId: String? = continuation.userId
val userName: String
if (continuation.label == 0) { result? .throwOnFailure() println("Before")
continuation.label = 1
val res = getUserId(token, continuation)
if (res == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
result = Result.success(res)
}
if (continuation.label == 1) { result!! .throwOnFailure() userId = result.getOrNull()as String
println("Got userId: $userId")
continuation.label = 2
continuation.userId = userId
val res = getUserName(userId, continuation)
if (res == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
result = Result.success(res)
}
if (continuation.label == 2) { result!! .throwOnFailure() userName = result.getOrNull()as String
println(User(userId as String, userName))
println("After")
return Unit
}
error("Impossible")}class MyFunctionContinuation(val completion: Continuation<Unit>, val token: String) : Continuation<String> {
override val context: CoroutineContext // Don't think about it now
get() = completion.context
var label = 0
var result: Result<Any>? = null
var userId: String? = null
override fun resumeWith(result: Result<String>) {
this.result = result
val res = try {
val r = printUser(token, this)
if (r == COROUTINE_SUSPENDED) return
Result.success(r as Unit)}catch (e: Throwable) {
Result.failure(e)
}
completion.resumeWith(res)
}
}
fun main(a) {
toStart()
}
Copy the code
The call stack
When method A calls method B, the virtual machine needs to store a’s state and the address to continue execution when b completes. The format in which this information is stored is called the call stack. The problem is that when we suspend, we release the thread, which means our call stack clears itself. So as a result, we didn’t use it when we recovered. Instead, continuations can act as a call stack. Each continuation holds a reference to where we suspended it (by label), the variables and parameters local to the method, and the continuation corresponding to the method that called us. One continuation refers to another, another to another, and so on. As a result, our continuation is like a giant onion, holding everything on a call stack that looks something like this:
suspend fun a(a) {
val user = getUser()
b()
b()
b()
return user
}
suspend fun b(a) {
for (i in 1.10.) {
c(i)
}
}
suspend fun c(i: Int) {
delay(i * 100)
println("Tick")}Copy the code
A simple continuation representation looks something like this:
CContinuation { label -> 1 completion -> BContinuation { i -> 4 label -> 1 completion -> AContinuation { label -> 2 user -> User@1234 } } }Copy the code
When a continuation resumes, each continuation first calls its own method, and when the call is complete, it resumes the continuation corresponding to the method that called it. The process of calling the method is repeated up to the top of the stack:
override fun resumeWith(result: Result<String>) {
this.result = result
val res = try {
val r = requestUser(token, this)
if (r == COROUTINE_SUSPENDED) return
Success(r)
} catch (e: Throwable) {
Failure(e)
}
completion.resumeWith(res)
}
Copy the code
Graphically, the process looks like this:
Like exceptions, they are thrown out of the method one by one, unless they are caught somewhere.
So completion is the continuation of the method that calls the current method. When the current method ends, the parent method is called again with the result, and the parent method goes to the next state (or throws an exception that is already in Result)
Real code
Real continuations and hang methods are much more complex than this, because they involve some optimizations and additional algorithms:
- Build a better exception stack
- Add coroutine suspend interceptors (this feature is discussed later)
Here is a partial implementation of BaseContinuationImpl based on Kotlin 1.5.30, which shows the real resumeWith implementation (other methods and comments are skipped) :
internal abstract class BaseContinuationImpl(
val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// The use of loops instead of recursion is internally determined, and the external cannot have a chance to duplicate it, so final
final override fun resumeWith(result: Result<Any? >) {
// Use loops instead of recursion to make the recovery stack shorter
var current = this
var param = result
while (true) {
// Used to fill debugging information
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast
// Execute your own content to produce results
valoutcome: Result<Any? > =try {
val outcome = invokeSuspend(param)
// If you need to suspend, return directly. Don't worry, the downstream method has a reference to the current continuation (completion), which will retrieve itself upon completion
if (outcome === COROUTINE_SUSPENDED)
return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
// State machine instances are (in a sense) terminating...
if (completion is BaseContinuationImpl) {
// Use loops instead of recursion to trace the call chain. Completion represents calling your own method
current = completion
param = outcome
} else {
// The top-level call will not be of type BaseContinuationImpl
// Exit with the final result
completion.resumeWith(outcome)
return}}}}// ...
}
Copy the code
As you can see, it uses loops instead of recursive calls. This change will make the code more streamlined and simpler.
discuss
The actual implementation is more complicated, but I hope this article gives you some idea of what coroutines are like inside. The key points are:
- A suspended method is somewhat like a state machine, with a state at the beginning of the method and after each suspended method call
label
The represented state and local data are stored incontinuation
In the object- Method corresponding to
continuation
It was the other onecontinuation
All of these thingscontinuation
Represents a call stack that comes into play during recovery
- The actual algorithm would be more complicated,
label
The first digit will have additional changes, and this digit will check in another place. This algorithm is used to make pending methods reproducible. This point has been skipped in the pursuit of simple understanding - The call stack is finite, and if it’s all used up, then we need to deal with it
StackOverflowError
Stack overflow error