An overview of the

In the previous two articles recorded the basic use of Kotlin coroutines and in-depth analysis of the working principle of coroutines, through the in-depth Kotlin coroutines source code, found that there are three layers of coroutines in Kotlin packaging:

  • The first layer of packaging: Job returned by Launch & Async, Deferred inherits AbstractCoroutine, encapsulates the state of coroutine and provides interfaces such as Cancel.
  • Second packing: SuspendLambda subclasses generated by the compiler encapsulate the true execution logic of the coroutine, The inheritance relationship is SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, whose completion parameter is the first-layer wrapper instance;
  • The third wrapper, DispatchedContinuation, encapsulates the thread scheduling logic, and its continuation parameter is an example of the second wrapper.

All three layers of wrappers implement the Continuation interface, combining the layers of coroutines together in a proxy pattern, each responsible for a different function, as shown below:

Links to related articles:

  • Link to blog
  • Kotlin coroutine Series I – Basic use
  • Kotlin Coroutines Series 2 – In-depth understanding of how coroutines work
  • Kotlin coroutine series three – Coroutine cancellation and exception handling processes

This is the last article in the Kotlin coroutine series 2 – an in-depth understanding of how coroutines work, looking at the parent-child relationship of coroutines and the cancellation and exception handling logic of coroutines. If the content of the article is wrong, welcome to point out, common progress! Leave a “like” if you think it’s good

Father and son coroutines

Take the launch method as an example:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if(combined ! == Dispatchers.Default && combined[ContinuationInterceptor] ==null)
        debug + Dispatchers.Default else debug
}
Copy the code

In newCoroutineContext, The new coroutine uses the previous coroutineContext of CoroutineScope in conjunction with the context passed in by launch, and then calls the coroutine. Start (start, coroutine, block) method:

public abstract class AbstractCoroutine<in T>(
    @JvmField
    protected val parentContext: CoroutineContext, active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R. () - >T) {
        initParentJob()
        // ...
    }

    internal fun initParentJob(a) {
        initParentJobInternal(parentContext[Job])
    }
}

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
    internal fun initParentJobInternal(parent: Job?). {
        assert { parentHandle == null }
        if (parent == null) {
            parentHandle = NonDisposableHandle
            return
        }
        parent.start() // make sure the parent is started
        val handle = parent.attachChild(this)
        parentHandle = handle
        // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
        if (isCompleted) {
            handle.dispose()
            parentHandle = NonDisposableHandle // release it just in case, to aid GC}}}Copy the code

Take a look at the GlobalScope code:

public object GlobalScope : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
Copy the code

CoroutineContext is an EmptyCoroutineContext, so parentContext[Job] is empty, So globalscope.launch {} and GlobalScope.async{} create coroutines that have no parent.

Next, focus on the parent. AttachChild method:

public final override fun attachChild(child: ChildJob): ChildHandle {
    return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}
Copy the code

The invokeOnCompletion method, which was seen in the previous article and is not given in the source code, basically adds the handler node to a queue (state.list) of the parent coroutine.

Coroutines complete

Front said coroutines completed through AbstractCoroutine. ResumeWith implementation, it will call like this: makeCompletingOnce -> tryMakeCompleting -> tryMakeCompletingSlowPath -> tryWaitForChild:

private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?).: Boolean {
    val handle = child.childJob.invokeOnCompletion(
        invokeImmediately = false,
        handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
    )
    if(handle ! == NonDisposableHandle)return true // child is not complete and we've started waiting for it
    valnextChild = child.nextChild() ? :return false
    return tryWaitForChild(state, nextChild, proposedUpdate)
}
Copy the code

The tryWaitForChild method adds the ChildCompletion node to the state.list queue of the child coroutine, and calls ChildCompletion.invoke when the child coroutine is completed or cancelled:

override fun invoke(cause: Throwable?). {
    parent.continueCompleting(state, child, proposedUpdate)
}

private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?). {
    assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
    // figure out if we need to wait for next child
    val waitChild = lastChild.nextChild()
    // try wait for next child
    if(waitChild ! =null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
    // no more children to wait -- try update state
    val finalState = finalizeFinishingState(state, proposedUpdate)
    afterCompletion(finalState)
}
Copy the code

As a result, the parent coroutine waits for all its children to complete or cancel before completing itself.

Coroutines cancel

Let’s start with the coroutine’s cancel method:

public fun cancel(cause: CancellationException? = null)

// JobSupport
public override fun cancel(cause: CancellationException?).{ cancelInternal(cause ? : defaultCancellationException()) }internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null)= JobCancellationException(message ? : cancellationExceptionMessage(), cause,this)

public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}

internal fun cancelImpl(cause: Any?).: Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
        // completing and had recorded exception
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
        finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true}}}Copy the code

The notifyCancelling method is then called:

// Add handler to the list
private fun notifyCancelling(list: NodeList, cause: Throwable) {
    // first cancel our own children
    onCancelling(cause)
    // The invoke method of the ChildHandleNode added above is looped through
    notifyHandlers<JobCancellingNode<*>>(list, cause)
    // May cancel the parent coroutine
    cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}

internal class ChildHandleNode(
    parent: JobSupport,
    @JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
    override fun invoke(cause: Throwable?). = childJob.parentCancelled(job)
    // ...
}

// Subcoroutines cancel themselves with this method
public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}

// The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
// Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
// never returns `false` for instances of [CancellationException], otherwise such exception
// may leak to the [CoroutineExceptionHandler].
private fun cancelParent(cause: Throwable): Boolean {
    // isScopedCoroutine is true and returns without canceling the parent coroutine. The default is false. Subclasses can be overridden
    if (isScopedCoroutine) return true

    /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it. * This allow  parent to cancel its children (normally) without being cancelled itself, unless * child crashes and produce some other exception during its completion. */
    val isCancellation = cause is CancellationException
    val parent = parentHandle
    // No parent -- ignore CE, report other exceptions.
    if (parent === null || parent === NonDisposableHandle) {
        return isCancellation
    }

    / / call before ChildHandleNode childCancelled
    return parent.childCancelled(cause) || isCancellation
}

internal class ChildHandleNode(
    parent: JobSupport,
    @JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
    override fun invoke(cause: Throwable?). = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
    // ...
}

// JobSupport
// Child was cancelled with a cause. In this method parent decides whether it cancels itself
// It is overridden in supervisor implementations to completely ignore any child cancellation.
// Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
public open fun childCancelled(cause: Throwable): Boolean {
    if (cause is CancellationException) return true
    return cancelImpl(cause) && handlesException
}

private class SupervisorCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    When the coroutine is running, it can only be propagated from the parent coroutine to the child coroutine when it calls cancel and passes exceptions.
    // The parent coroutine will not be cancelled
    override fun childCancelled(cause: Throwable): Boolean = false
}
Copy the code

Therefore, when a coroutine calls cancel, it cancels all of its children and does not cancel its parent by default.

AbstractCoroutine cancelation only modiates the state of the coroutine in the first layer wrapper, AbstractCoroutine, and does not affect the execution logic in the second layer wrapper, BaseContinuationImpl.

GlobalScope.launch {
    println("1")
    val job = launch {
        println("2")
        try {
            delay(500)}catch (e: CancellationException) {
            println("exception")
        }
        println("3")
        if (isActive) {
            println("4")
        }
        delay(500) / / throw CancellationException
        println("5")
    }
    delay(100)
    job.cancel()
}
// output
1
2
exception
3
Copy the code

Exception handling

Exception handling

Coroutine builders come in two forms: auto-launch exceptions (launch and actor) or expose exceptions to users (async and produce). When the building is used to create a root coroutines, namely the coroutines not another collaborators cheng coroutines, the former such builder will anomalies as uncaught exception, similar to Java Thread. UncaughtExceptionHandler, while the latter rely on users to final consumption is unusual, For example, by await or receive.

fun log(obj: Any) {
    println("${Thread.currentThread()}: $obj")}fun main(a) = runBlocking {
    val job = GlobalScope.launch { // launch root coroutine
        log("Throwing exception from launch")
        throw IndexOutOfBoundsException() / / we will print the console Thread. DefaultUncaughtExceptionHandler
    }
    job.join()
    log("Joined failed job")
    val deferred = GlobalScope.async { // async root coroutine
        log("Throwing exception from async")
        throw ArithmeticException() // Do not print anything, relying on the user to call wait
    }
    try {
        deferred.await()
        log("Unreached")}catch (e: ArithmeticException) {
        log("Caught ArithmeticException")}}// output
Thread[DefaultDispatcher-worker-1.5,main]: Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-1" java.lang.IndexOutOfBoundsException
    // ...
Thread[main,5,main]: Joined failed job
Thread[DefaultDispatcher-worker-1.5,main]: Throwing exception from async
Thread[main,5,main]: Caught ArithmeticException
Copy the code

Can use CoroutineExceptionHandler to catch exceptions launch:

fun main(a) = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")}val job = GlobalScope.launch(handler) { // Root coroutine, running in GlobalScope
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) { // Same root coroutine but async instead of launch
        throw ArithmeticException() // Nothing is printed, relying on the user to call deferred.await()
    }
    joinAll(job, deferred)
}

// output
CoroutineExceptionHandler got java.lang.AssertionError
Copy the code

The coroutine is internally cancelled using a CancellationException, which is ignored by all handlers. If a coroutine encounters an exception other than CancellationException, by default it will use that exception to cancel its parent coroutine.

fun main(a) = runBlocking {
    coroutineScope {
        log(0)
        launch {
            log(1)
            throw Exception()
        }
        log(2)
        delay(100)
        log(3)
        log(isActive)
    }
}

// output
Thread[main,5,main]: 0
Thread[main,5,main]: 2
Thread[main,5,main]: 1
Exception in thread "main" java.lang.Exception

fun main(a) = runBlocking {
    supervisorScope {
        log(0)
        launch {
            log(1)
            throw Exception()
        }
        log(2)
        delay(100)
        log(3)
        log(isActive)
    }
}

// output
Thread[main,5,main]: 0
Thread[main,5,main]: 2
Thread[main,5,main]: 1
Exception in thread "main" java.lang.Exception
Thread[main,5,main]: 3
Thread[main,5,main]: true

fun main(a) = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")}val job = GlobalScope.launch(handler) {
        val inner = launch { // All coroutines in the stack will be cancelled
            launch {
                launch {
                    throw IOException() // Primitive exception
                }
                sleep(100)
                println(isActive)
            }
            sleep(100)
            println(isActive)
        }
        println("before join")
        try {
            inner.join()
            println("join")}catch (e: CancellationException) {
            println("Rethrowing CancellationException with original cause")
            // The following "after Join "will be printed if the exception is not overridden
            throw e // The cancellation exception is rethrown, but the original IOException is handled
        }
        println("after join")
    }
    job.join()
}

// output
before join
Rethrowing CancellationException with original cause
false
false
CoroutineExceptionHandler got java.io.IOException
Copy the code

The original exception is handled by the parent coroutine only after all the children of the parent coroutine have finished:

fun main(a) = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception -> 
        println("CoroutineExceptionHandler got $exception")}val job = GlobalScope.launch(handler) {
        launch { // the first subcoroutine
            try {
                delay(Long.MAX_VALUE)
            } finally {
                withContext(NonCancellable) {
                    println("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    println("The first child finished its non cancellable block")
                }
            }
        }
        launch { // The second subcoroutine
            delay(10)
            println("Second child throws an exception")
            throw ArithmeticException()
        }
    }
    job.join()    
}

// output
Second child throws an exception
Children are cancelled, but exception is not handled until all children terminate
The first child finished its non cancellable block
CoroutineExceptionHandler got java.lang.ArithmeticException
Copy the code

When multiple child coroutines of a coroutine fail due to an exception, the general rule is “take the first exception”, so the first exception will be handled. All other exceptions that occur after the first exception are bound to the first exception as suppressed exceptions.

fun main(a) = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")}val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE) // When another sibling coroutine fails due to IOException, it will be cancelled
            } finally {
                throw ArithmeticException() // The second exception
            }
        }
        launch {
            delay(100)
            throw IOException() // The first exception
        }
        delay(Long.MAX_VALUE)
    }
    job.join()
}

// output
CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]
Copy the code

The source code interpretation

In BaseContinuationImpl. ResumeWith method will catch coroutines throw exceptions:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any? >) {
        while (true) {
            // ...
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                valoutcome: Result<Any? > =try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // When a subcoroutine throws an exception, it is caught here and given to outcome as a result
                        Result.failure(exception)
                    }
                if (completion is BaseContinuationImpl) {
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}
Copy the code

After capturing the exception, call AbstractCoroutine. ResumeWith, its process is: AbstractCoroutine.resumeWith -> JobSupport.makeCompletingOnce -> JobSupport.tryMakeCompleting -> JobSupport.tryMakeCompletingSlowPath:

public final override fun resumeWith(result: Result<T>) {
    // Get state from result
    val state = makeCompletingOnce(result.toState())
    if (state === COMPLETING_WAITING_CHILDREN) return
    afterResume(state)
}

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?).: Any? {
    vallist = getOrPromoteCancellingList(state) ? :return COMPLETING_RETRY
    val finishing = state as? Finishing ? : Finishing(list,false.null)
    var notifyRootCause: Throwable? = null
    synchronized(finishing) {
        if (finishing.isCompleting) return COMPLETING_ALREADY
        finishing.isCompleting = true
        if(finishing ! == state) {if(! _state.compareAndSet(state, finishing))return COMPLETING_RETRY
        }
        val wasCancelling = finishing.isCancelling
        (proposedUpdate as? CompletedExceptionally)? .let { finishing.addExceptionLocked(it.cause) }// If it just becomes cancelling --> must process cancelling notificationsnotifyRootCause = finishing.rootCause.takeIf { ! wasCancelling } }// process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)notifyRootCause? .let { notifyCancelling(list, it) }val child = firstChild(state) // now wait for children
    if(child ! =null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
    // otherwise -- we have not children left (all were already cancelled?)
    return finalizeFinishingState(finishing, proposedUpdate)
}
Copy the code

When a coroutine throws an exception, the notifyCancelling method is called, which has been resolved above.

Therefore, when an exception occurs on a coroutine, all of its children are cancelled, and its parent is cancelled by default.

Next look at the finalizeFinishingState method:

private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?).: Any? {
    // ...
    if(finalException ! =null) {
        val handled = cancelParent(finalException) || handleJobException(finalException)
        if (handled) (finalState as CompletedExceptionally).makeHandled()
    }
    // ...
}
Copy the code

If the coroutine throws an uncaught, uncancelled exception, the upper coroutine is cancelled step by step, and the root coroutine calls handleJobException to handle the exception:

// JobSupport
protected open fun handleJobException(exception: Throwable): Boolean = false

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true}}Copy the code

As you can see, handleJobException is empty by default, and DeferredCoroutine does not override this method. StandaloneCoroutine does. Therefore, if the root coroutine is an async initiated coroutine, there will be no crash, while the launch coroutine will call handleCoroutineException to handle exceptions:

public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
    try {
        / / defines CoroutineExceptionHandler is handled by itcontext[CoroutineExceptionHandler]? .let { it.handleException(context, exception)return}}catch (t: Throwable) {
        handleCoroutineExceptionImpl(context, handlerException(exception, t))
        return
    }
    // If a handler is not present in the context or an exception was thrown, fallback to the global handler
    handleCoroutineExceptionImpl(context, exception)
}

/ / according to the ServiceLoader, exception handling and AndroidExceptionPreHandler in the Android platform
private val handlers: List<CoroutineExceptionHandler> = ServiceLoader.load(
        CoroutineExceptionHandler::class.java,
        CoroutineExceptionHandler::class.java.classLoader
).iterator().asSequence().toList()

internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
    // use additional extension handlers
    for (handler in handlers) {
        try {
            handler.handleException(context, exception)
        } catch (t: Throwable) {
            // Use thread's handler if custom handler failed to handle exception
            val currentThread = Thread.currentThread()
            currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
        }
    }
    // use thread's handler
    val currentThread = Thread.currentThread()
    currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}

// Thread
public UncaughtExceptionHandler getUncaughtExceptionHandler() {
    If uncaughtExceptionHandler is null, the thread group is used
    // private ThreadGroup group;
    returnuncaughtExceptionHandler ! =null ? uncaughtExceptionHandler : group;
}

// ThreadGroup
public void uncaughtException(Thread t, Throwable e) {
    if(parent ! =nullParent.uncaughtexception (t, e); }else {
        / / use priority thread general DefaultUncaughtExceptionHandler, otherwise in the console print exception stack information
        Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler();
        if(ueh ! =null) {
            ueh.uncaughtException(t, e);
        } else if(! (e instanceof ThreadDeath)) { System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); }}}Copy the code

conclusion

  • The parent coroutine waits for all its children to complete or cancel before completing itself.
  • When a coroutine calls cancel, all of its children are canceled, and its parent is not canceled by default. (The canceled coroutine throws a CancellationException at the suspension point and is ignored by the coroutine’s mechanism, and the current coroutine does not continue execution.)
  • When an uncaught exception occurs, the coroutine cancels all of its children and does not proceed to execute itself. It may cancel its parent coroutine (by default). The default cancels the upper coroutine step by step and finally cancels the root coroutine, which handles the exception. The parent coroutine is unaffected when the exception is a CancellationException or the container is handling the job or scope. The principle is to override the childCancelled method.
  • Launch coroutines and actor coroutines default way of dealing with the abnormal in the JVM is to print the stack information, in the Android will collapse (see Android exception mechanism), you can customize CoroutineExceptionHandler to handle the exception.
  • Async coroutines itself does not handle exceptions, the custom CoroutineExceptionHandler is invalid, but will await to restore the caller coroutines when an exception is thrown.

Reference:

  • www.kotlincn.net/docs/refere…
  • www.jianshu.com/p/2979732fb…