Key words: Kotlin coroutine coroutine suspend task suspend non-blocking
Coroutine hangings are a mystery at first, because we always think in terms of threads, so all we can think about is blocking. What is a non-blocking hang? You might laugh (cry? . I’m sorry I couldn’t have written this article more easily, but you must do it yourself!
1. Start with Delay
When we first learned about threading, one of the most common ways to simulate all kinds of delays was thread. sleep, and in coroutines, the corresponding one was delay. Sleep puts the thread into hibernation until some signal or condition arrives after a specified time, and the thread attempts to resume execution, while delay suspends the coroutine. This process does not block the CPU, or even “delays nothing” in terms of hardware efficiency. In this sense, delay can also be a good way to put coroutines to sleep.
The source for Delay is actually quite simple:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
Copy the code
Cont. Context. Delay scheduleResumeAfterDelay this operation, you can analogy the setTimeout JavaScript, Android handler. PostDelay, It essentially sets up a delayed callback that calls cont’s resume series of methods when the time is up to allow the coroutine to continue executing.
The key to the rest of the is suspendCancellableCoroutine, this is our old friends, we use it in front of the implement the callback to transform – coroutines original delay is also based on it, if we see some source code, You’ll find similar things like join, await, and so on.
2. It said suspendCancellableCoroutine again
Since everyone for suspendCancellableCoroutine already very familiar with, so we just directly call an old friend to everyone:
private suspend fun joinSuspend(a) = suspendCancellableCoroutine<Unit> { cont ->
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
}
Copy the code
The job.join () method first checks to see if the state of the caller’s Job is complete. If so, it returns and continues with the rest of the code without suspending it. Otherwise, it goes to the joinSuspend branch. We see here only registered a callback is finished, then the legendary suspendCancellableCoroutine internal actually do?
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
block(cancellable)
cancellable.getResult() // The type here is Any?
}
Copy the code
SuspendCoroutineUninterceptedOrReturn this method invocation is invisible to the source, because it didn’t have a source code: P it’s logic is to help you get a Continuation instance, really only in this way. But it is still very abstract, because there is a very suspicious: suspendCoroutineUninterceptedOrReturn return value type is T, and the incoming lambda Any is the return value of the type? The type of cancellable.getresult () is Any? Why is that?
I remember at the beginning of the coroutine series that I mentioned the signature of the suspend function in the case of await, which is roughly equivalent to:
fun await(continuation: Continuation<User>): Any {
...
}
Copy the code
On the one hand, Suspend added a Continuation argument to this method. On the other hand, the original return value type User becomes the generic argument to the Continuation, and the real return value type turns out to be Any. Of course, since the logical return type User is defined as non-nullable, the real return type is also indicated by Any. If the generic argument is nullable, then the real return type is Any? This is exactly the same as the previously mentioned cancellable.getresult () which returns this Any? Corresponding.
If you look up the source code for await, you’ll also see the call to getResult().
Suspend functions do not have to be suspended. Suspend functions can be suspended when needed, i.e., wait until the execution of the coroutine is complete before continuing. If at the start of join or await or any other suspend function, if the target coroutine has already completed, there is no need to wait, just take the result and walk away. The magic logic lies in what cancellable.getresult () returns, and see:
internal fun getResult(a): Any? {...if (trySuspend()) return COROUTINE_SUSPENDED // (1) Trigger suspend logic.if (state is CompletedExceptionally) // ② The exception is thrown immediately
throw recoverStackTrace(state.cause, this)
return getSuccessfulResult(state) // ③ Return the normal result immediately
}
Copy the code
(1) The execution of the target coroutine is not complete, and it needs to wait for the result. (2) and (3) the execution of the coroutine can directly get the exception and normal results. (3) The execution of the coroutine is not complete. ② and ③ are easy to understand, but the key is ①, it’s going to hang, what is this return?
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
Copy the code
This is an implementation of 1.3. The previous 1.3 implementation was more interesting and was a whiteboard Any. It doesn’t matter what it is, the point is that this thing is a singleton, and any time the coroutine sees it, it knows it should hang.
3. Go deep into the suspend operation
Since it comes to hanging, you may feel that still a little understanding, or do not know how to do hanging, how to do? To tell you the truth, this hang is what operation has not been taken out for you to see, not that we are too stingy, but too early to take out will be more scary.
suspend fun hello(a) = suspendCoroutineUninterceptedOrReturn<Int>{
continuation ->
log(1)
thread {
Thread.sleep(1000)
log(2)
continuation.resume(1024)
}
log(3)
COROUTINE_SUSPENDED
}
Copy the code
I wrote a suspend function, so in the midst of suspendCoroutineUninterceptedOrReturn COROUTINE_SUSPENDED directly back to the legend of white board, normally we should be in a collaborators cheng calls this method right? But I don’t, I write a piece of Java code to call this method, what happens?
public class CallCoroutine {
public static void main(String... args) {
Object value = SuspendTestKt.hello(new Continuation<Integer>() {
@NotNull
@Override
public CoroutineContext getContext(a) {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object o) { / / 1.
if(o instanceof Integer){
handleResult(o);
} else{ Throwable throwable = (Throwable) o; throwable.printStackTrace(); }}});if(value == IntrinsicsKt.getCOROUTINE_SUSPENDED()){ / / 2.
LogKt.log("Suspended.");
} else{ handleResult(value); }}public static void handleResult(Object o){
LogKt.log("The result is "+ o); }}Copy the code
This code looks strange and can be confusing in two ways:
In Kotlin, resumeWith takes the type Result, so it becomes Object. Because Result is an inline class, compilation replaces it with its unique member, so Object (in Kotlin, Any?) is replaced.
(2) place IntrinsicsKt. GetCOROUTINE_SUSPENDED () is the COROUTINE_SUSPENDED Kotlin
The rest is not hard to understand, and the result is something like this:
07:52:55:288 [main] 1
07:52:55:293 [main] 3
07:52:55:296 [main] Suspended.
07:52:56:298 [Thread-0] 2
07:52:56:306 [Thread-0] The result is 1024
Copy the code
In fact, this Java code is very similar to the following call to Kotlin:
suspend fun main(a) {
log(hello())
}
Copy the code
It’s just not easy to get the actual value of hello when it’s suspended in Kotlin, and the other results are exactly the same.
12:44:08:290 [main] 1
12:44:08:292 [main] 3
12:44:09:296 [Thread-0] 2
12:44:09:296 [Thread-0] 1024
Copy the code
Chances are you’re getting a little confused here, but that’s ok. I’m now trying to uncover some of the logic behind coroutine hangings, and it’s a little bit more of a process of understanding and accepting the concept than simply using it.
4. Have a deep understanding of coroutine state transitions
Now that we’ve explained how coroutines work, it’s clear that Java code makes them easier to understand, so let’s look at a more complex example:
suspend fun returnSuspended(a) = suspendCoroutineUninterceptedOrReturn<String>{
continuation ->
thread {
Thread.sleep(1000)
continuation.resume("Return suspended.")
}
COROUTINE_SUSPENDED
}
suspend fun returnImmediately(a) = suspendCoroutineUninterceptedOrReturn<String>{
log(1)
"Return immediately."
}
Copy the code
We first define two suspended functions, the first of which will actually suspend and the second will return the result directly, similar to the two paths we discussed earlier for join or await. Let’s use Kotlin to give an example of how to call them:
suspend fun main(a) {
log(1)
log(returnSuspended())
log(2)
delay(1000)
log(3)
log(returnImmediately())
log(4)}Copy the code
The running results are as follows:
08:09:37:090 [main] 1
08:09:38:096 [Thread-0] Return suspended.
08:09:38:096 [Thread-0] 2
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 3
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 4
Copy the code
Ok, now we want to reveal what this coroutine code really looks like. To do that, let’s copy this logic in Java:
Note that the following code is not logically rigorous and should not be used in production, but only for learning to understand coroutines.
public class ContinuationImpl implements Continuation<Object> {
private int label = 0;
private final Continuation<Unit> completion;
public ContinuationImpl(Continuation<Unit> completion) {
this.completion = completion;
}
@Override
public CoroutineContext getContext(a) {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object o) {
try {
Object result = o;
switch (label) {
case 0: {
LogKt.log(1);
result = SuspendFunctionsKt.returnSuspended( this);
label++;
if (isSuspended(result)) return;
}
case 1: {
LogKt.log(result);
LogKt.log(2);
result = DelayKt.delay(1000.this);
label++;
if (isSuspended(result)) return;
}
case 2: {
LogKt.log(3);
result = SuspendFunctionsKt.returnImmediately( this);
label++;
if (isSuspended(result)) return;
}
case 3:{
LogKt.log(result);
LogKt.log(4);
}
}
completion.resumeWith(Unit.INSTANCE);
} catch(Exception e) { completion.resumeWith(e); }}private boolean isSuspended(Object result) {
returnresult == IntrinsicsKt.getCOROUTINE_SUSPENDED(); }}Copy the code
We define a Java class ContinuationImpl, which is the implementation of a Continuation.
In fact, you can actually find a class called ContinuationImpl in the Kotlin standard library if you like, except that its resumeWith ends up calling invokeSuspend, And this invokeSuspend is actually the body of our coroutine, which is usually a Lambda expression — we launch the coroutine, and the Lambda expression that we pass in is actually compiled into a subclass of SuspendLambda, This in turn is a subclass of ContinuationImpl.
With this class we also need to prepare a completion to receive the result. This class mimics the standard library’s RunSuspend class implementation. If you have read the previous article, you should know that the implementation of Suspend Main is based on this class:
public class RunSuspend implements Continuation<Unit> {
private Object result;
@Override
public CoroutineContext getContext(a) {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object result) {
synchronized (this) {this.result = result;
notifyAll(); // The coroutine has ended, notifying the following wait() method to stop blocking}}public void await(a) throws Throwable {
synchronized (this) {while (true){
Object result = this.result;
if(result == null) wait(); // The call to object.wait () blocks the current thread and returns on a call to notify or notifyAll
else if(result instanceof Throwable){
throw (Throwable) result;
} else return; }}}}Copy the code
The key point of this code is the await() method, which creates an await() loop, but don’t be afraid, the loop is a paper tiger. If the result is null, the current thread will immediately block until the result appears. The specific use method is as follows:
.public static void main(String... args) throws Throwable {
RunSuspend runSuspend = new RunSuspend();
ContinuationImpl table = newContinuationImpl(runSuspend); table.resumeWith(Unit.INSTANCE); runSuspend.await(); }...Copy the code
This is exactly what Suspend Main really looks like.
We see that the resumeWith of the RunSuspend instance passed in as a Completion is actually called at the end of the resumeWtih of the ContinuationImpl, so that once its await() is in the blocked state, The blocking does not stop until the overall state flow of ContinuationImpl is complete, at which point the process has finished running and exits normally.
The result of this code is as follows:
08:36:51:305 [main] 1
08:36:52:315 [Thread-0] Return suspended.
08:36:52:315 [Thread-0] 2
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] 3
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] 4
Copy the code
As you can see, this plain Java code is exactly the same as the previous Kotlin coroutine call. So what do I base this Java code on? That’s the bytecode that comes out of the compilation of the Kotlin coroutine. Of course, bytecode is somewhat abstract, but I wrote it this way to make it easier for you to understand how coroutines are executed, and I believe you have a better understanding of the nature of coroutines:
- The suspended function of a coroutine is essentially a callback of type
Continuation
- The execution of the coroutine body is a state machine, and each time a suspended function is encountered, it is a state transition, as in our previous example
label
Constant increment to achieve state flow
If you can understand these two points clearly, then I believe you will not be a problem when learning the other concepts of coroutines. If you want to do thread scheduling, just do the thread switch at resumeWith as we talked about with the scheduler, which is pretty easy to understand. The official coroutine framework essentially does these things. If you look at the source code, it’s a bit confusing, mainly because the framework needs to be cross-platform and optimized for performance in addition to the core logic, but either way, the source code looks like this: State machine callbacks.
5. Summary
It may be difficult to understand at the moment, but it doesn’t matter. You can use coroutines for a while and then read the content again. I believe you will be very happy.
Kotlin’s Sequence is based on coroutines. Its usage is very simple, almost no difference with ordinary Iterable. Therefore, we will focus on its internal implementation principle in Sequence section.
Welcome to Kotlin Chinese community!
Chinese website: www.kotlincn.net/
Chinese official blog: www.kotliner.cn/
Official account: Kotlin
Zhihu column: Kotlin
CSDN: Kotlin Chinese community
Nuggets: Kotlin Chinese Community
Kotlin Chinese Community