This article requires a basic understanding of coroutines. For the use of coroutines, please refer to the official tutorial: [play.kotlinlang.org/hands-on/In play.kotlinlang.org/hands-on/In… to Coroutines and Channels / 01 _introduction)

What is a coroutine?

The coroutine library is a library provided by the Kotlin language that is used as a framework for handling asynchronous and concurrent scenarios.

“A coroutine is an object of suspended computation. Similar in concept to threads. In a sense, blocks of coroutine code run concurrently with other code. However, coroutines are not bound by any particular thread. It might be suspended in one thread and woken up in another.

According to the official documentation, a coroutine is actually an object. To understand what coroutines are and how they differ from threads, see the following example.

The printTimeAndString method is a method that prints time and input characters. In the main function, a coroutine is launched via GlobalScope.launch, and the delay is 1 second. Print in coroutine start/end The external thread then prints print in Thread, sleeps for 2s, and finally prints end to end the main method.

fun main(a) {
    printTimeAndString("start")
    GlobalScope.launch {
      	printTimeAndString("print in coroutine start")
        delay(1000)
        printTimeAndString("print in coroutine end")
    }
    printTimeAndString("print in thread")
    Thread.sleep(2000)
    printTimeAndString("end")}fun printTimeAndString(text: String) {
    println("${Date()}: $text")}Copy the code

The print result is as follows:

Sat Dec 11 19:09:21 CST 2021: start
Sat Dec 11 19:09:21 CST 2021: print in thread
Sat Dec 11 19:09:21 CST 2021: print in coroutine start
Sat Dec 11 19:09:22 CST 2021: print in coroutine end
Sat Dec 11 19:09:23 CST 2021: end
Copy the code

The logical order of the code is:

  1. Print the start
  2. Print “print in thread”
  3. Create a coroutine
  4. Print “print in coroutine start”
  5. The thread is blocked
  6. Print “print in coroutine end”
  7. The thread blocks and prints end

It can be seen from the print result that print in thread is executed prior to coroutine. Delay 1s in coroutine does not cause thread blocking, and the logic in coroutine and thread are executed in parallel.

Modify the printTimeAndString method and print the current thread information, then execute:

fun printTimeAndString(text: String) {
    println("${Date()}: $text: ${Thread.currentThread()}")}Copy the code

Print result:

Sat Dec 11 19:10:49 CST 2021: start: Thread[main,5,main] Sat Dec 11 19:10:49 CST 2021: print in thread: Thread[main,5,main] Sat Dec 11 19:10:49 CST 2021: print in coroutine start: Thread[defaultDispatcher-worker-1,5,main] Sat Dec 11 19:10:50 CST 2021: Print in Coroutine end: Sat Dec 11 19:10:51 CST 2021: End: Thread[main,5,main]Copy the code

As you can easily see from this print, the code in the coroutine executes in another thread, and all methods outside the main method coroutine execute in the main thread. They have different threads, so the relationship between coroutines and threads is does the coroutine create a new thread to execute the code?

In fact, this is not the case. Coroutines can execute different coroutines by configuring context elements, or by constructing different top-level coroutines. Because the code above directly uses globalScope.launch without specifying any context, it defaults to the dispatchers.default, This element specifies that the coroutine is run in the thread pool.

If we let the coroutine specify execution in the main thread, we need to replace globalscope. launch with runBlocking. RunBlocking gets the current thread and creates a coroutine based on the current thread:

fun main(a) {
    printTimeAndString("start")
    runBlocking {
        printTimeAndString("print in coroutine start")
        delay(1000)
        printTimeAndString("print in coroutine end")
    }
    printTimeAndString("print in thread")
    Thread.sleep(2000)
    printTimeAndString("end")}Copy the code

The printed result is:

Sat Dec 11 19:12:02 CST 2021: start: Thread[main,5,main]
Sat Dec 11 19:12:02 CST 2021: print in coroutine start: Thread[main,5,main]
Sat Dec 11 19:12:03 CST 2021: print in coroutine end: Thread[main,5,main]
Sat Dec 11 19:12:03 CST 2021: print in thread: Thread[main,5,main]
Sat Dec 11 19:12:05 CST 2021: end: Thread[main,5,main]
Copy the code

Print in coroutine end print in coroutine end print in coroutine end print in coroutine end Second, the printing time has changed, with the entire code executing in 3 seconds, as opposed to 2 seconds using GlobalScope.launch. It may not seem obvious here, because they are so close in length that readers can check for themselves by adding delay and sleep; Finally, they both execute on the same main thread.

The above changes show that the coroutine and the thread run code on the same thread, and the coroutine blocks the thread. By using GlobalScope.launch, the coroutine runs on the thread, just like normal code. The special ability of coroutines is that they can be specified to run on different threads by implementing different coroutines. That’s why it’s said on the Internet that coroutines are a framework for switching threads.

Advantages of coroutines

Threads are the smallest unit of CPU scheduling on which all application code runs. The implementation of processes and threads is provided on the operating system level. On The Android platform, the pthread_create function in the Linux API is called during the creation of threads, which directly indicates that threads in the Java layer correspond to threads in the Linux system level. This means that the creation and destruction of threads need to interact with the operating system to schedule resources for the operating system. System resources are wasted when a lot of creation and destruction is required. So threads are not very lightweight.

Unlike threads, coroutines are essentially code and objects. It needs to run on a thread. So, it’s very lightweight. It does not invoke operating system-level methods and resources. So, when you create 1000 coroutine objects, it’s just a memory problem. ** Code executes on threads, so coroutines need to run on threads as well. ** Except for the extreme case of creating a new thread for each coroutine, the difference between threads and coroutines is essentially the difference between creating a thousand objects and a thousand threads.

Components of coroutines

In the previous section, we launched a coroutine using globalScope. launch. Using the Launch method as an example, we looked at the composition of the coroutine code:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >Unit
): Job
Copy the code

This is an extension of the CoroutineScope method, which takes three arguments, a return value type, and the method itself, making up six components:

  • CoroutineScope CoroutineScope

  • Coroutine builder launch, extension function of coroutine scope, coroutine builder function

  • CoroutineContext CoroutineContext, through which the coroutine scheduler can be specified

  • The coroutine startup TAB, CoroutineStart, defines the startup options for coroutine builds. It is used as a start parameter for startup, asynchronous, and other coroutine builder functions

  • Suspend code block: suspend CoroutineScope.() -> Unit, suspend closure, coroutine code called in the specified context

  • Coroutine task, which can be used to remove coroutines.

CoroutineScope CoroutineScope

CoroutineScope is an interface, and internally only one CoroutineContext property is defined:

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}
Copy the code

CoroutineScope is used to declare that classes that implement CoroutineScope have the ability to provide a coroutine context. ** This interface does not need to be implemented manually; the coroutine library provides a wrapped delegate implementation.

To summarize, CoroutineScope calls it a CoroutineScope, which, in addition to the translation of its name, also means that CoroutineScope objects can provide their own internal coroutine context, and with coroutine context, they can use coroutines. So, Within the scope of the CoroutineScope object is the scope of the coroutine.

CoroutineScope constructor

   public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
       ContextScope(if(context[Job] ! =null) context else context + Job()) Job() is created by default if the context does not contain the Job element
Copy the code

GlobalScope GlobalScope

GlobalScope implements the CoroutineScope interface and is a global CoroutineScope that can be used to create top-level coroutines. They live throughout the life of the application and are not cancelled prematurely. When creating a top-level coroutine, you can also start the coroutine using a CoroutineScope object with the appropriate scheduler; Certain situations (for example, must be in the process of the whole life cycle of the active top background processes) can be achieved by @ OptIn (DelicateCoroutinesApi: : class) to safe and legal use GlobalScope. In other cases it can be converted to a suspend function, for example:

    fun loadConfiguration(a) {
        GlobalScope.launch {
            val config = fetchConfigFromServer() // network request
            updateConfiguration(config)
        }
    }
Copy the code

The replacement scheme is as follows:

    suspend fun loadConfiguration(a) = coroutinesScope {
        launch { 
            val config = fetchConfigFromServer() // network request
            updateConfiguration(config)
        }
    }    
Copy the code

Here’s a quick introduction to the coroutineScope method:

    public suspend fun <R> coroutineScope(block: suspend CoroutineScope. () - >R): R {
        contract {
            callsInPlace(block, InvocationKind.EXACTLY_ONCE)
        }
        return suspendCoroutineUninterceptedOrReturn { uCont ->
            val coroutine = ScopeCoroutine(uCont.context, uCont, true)
            coroutine.startUndispatchedOrReturn(coroutine, block)
        }
    }
Copy the code

The coroutineScope method creates a ScopeCoroutine, which is also the coroutineScope implementation class. This method creates a CoroutineScope internally and executes the block.

MainScope

public fun MainScope(a): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
Copy the code

The context is designed with the container container container for handling the container.

Mainly used for UI components:

    class Activity {
        private val mainScope = MainScope()
    
        fun destroy(a) {
            mainScope.cancel()
        }
    }
Copy the code

Coroutine builder (coroutine builder)

Concept: Define a scope for a new coroutine. The coroutine builder is an extension function of CoroutineScope and inherits the coroutine context of CorourineScope to automatically pass context elements and cancelability.

Coroutine builder functions have the following characteristics:

  1. CoroutineScope extension function
  2. Inherits the CoroutineScope context
  3. The return type is Job

Scoping function

Functions such as withContext and coroutineScope differ from Launch in that they are suspend methods and generally accept a suspend closure as an argument. Functions that create a coroutine inside their methods and may specify some context elements (such as withContext) and pass the block through are called Scoping functions. Their structure is generally as follows:

public suspend fun <R> coroutineScope(block: suspend CoroutineScope. () - >R): R
public suspend fun <T> withContext(context: CoroutineContext,block: suspend CoroutineScope. () - >T): T
Copy the code

Bridge the normal and hang methods

Such functions as runBlocking and others, unlike the above two cases, can switch regular code to pending code.

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope. () - >T): T 
Copy the code

RunBlocking is a special case that creates a new coroutine and blocks the current thread until the coroutine terminates. This function should not be used in coroutines, and is designed primarily for the main function and testing.

Coroutine context

CoroutineContext is an interface:

public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E? // Get the element by key
  
    public fun <R> fold(initial: R, operation: (R.Element) - >R): R

    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // Make sure the interceptor is always last in the context (so it can be quickly retrieved when it appears)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    public fun minusKey(key: Key< * >): CoroutineContext
    public interface Key<E : Element>
    public interface Element : CoroutineContext {
      	/ /...}}Copy the code

CoroutineContext defines the operator overload methods get and plus:

  • The get method will get us throughcontext[key]Gets elements in the form of
  • The plus method allows us toDispatchers.Default + SuperviorJob()Concatenates context elements in the form of.

As you can see from the get and plus methods, a CoroutineContext is really a collection of elements.

Is a set of elements, so what is its data structure?

CoroutineContext data structure

Here is the plus method, which finally incorporates a CombinedContext:

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {

    override fun <E : Element> get(key: Key<E>): E? {
        var cur = this
        while (true) { cur.element[key]? .let {return it }
            val next = cur.left
            if (next is CombinedContext) {
                cur = next
            } else {
                return next[key]
            }
        }
    }
    // ...
}
Copy the code

CombinedContext has two attributes left and Element, which is obviously a linked list node. So the internal data structure of CoroutineContext is a linked list.

Coroutine startup item CoroutineStart

CoroutineStart is an enumerated class:

public enum class CoroutineStart {
    DEFAULT,
    LAZY,
    ATOMIC,
    UNDISPATCHED;

    @InternalCoroutinesApi
    public val isLazy: Boolean get() = this === LAZY
}
Copy the code

In the coroutine builder, start arguments for launch, async, and other coroutine constructors.

  • DEFAULT: Schedules coroutine execution immediately based on context.
  • LAZY: Starts the coroutine when needed. If the coroutine Job is canceled before it starts execution, it will not start execution at all, but will end up with an exception.
  • ATOMIC: Arranges coroutine execution atomically (in a non-cancelable manner) according to context, similar to DEFAULT, but coroutines cannot be cancelled before execution begins.
  • UNDISPATCHED: Executes the coroutine immediately up to its first pause point in the current thread, similar to a coroutine started with dispatchers.unconfined. However, when the coroutine resumes from the suspended state, it dispatches according to the CoroutineDispatcher in its context.

suspend () -> Unit

suspend T.() -> Unit
Copy the code

In Kotlin 1.6, suspend () -> Unit can be used as a parent type:

/ / define
class MyClickAction : suspend () - >Unit {
    override suspend fun invoke(a) { TODO() }
}
fun launchOnClick(action: suspend() - >Unit) {}
/ / call
launchOnClick(MyClickAction())
Copy the code

Note that suspend () -> Unit has two limitations as a parent type:

  • You cannot mix suspended and non-suspended blocks as a parent class.

    // Compiler Error: Mixing suspend and non-suspend supertypes is not allowed
    class MyClickAction : suspend () - >Unit, String  
    Copy the code
  • You cannot inherit multiple pending types.

    Because each suspension has an Invoke function, the Invoke function gets messy when it inherits multiple suspended types.

Alternatively, ordinary () -> Unit can also be passed to Fun getSuspending(suspend () -> Unit) {}, and Kotlin 1.6.0 introduces a switch from regular Block types to suspended Block types. The compiler automatically converts a normal block to a suspend block.

There is another form:

suspend {
	// ...
}
Copy the code

Unlike the suspend keyword above, suspend is a function:

public inline fun <R> suspend(noinline block: suspend() - >R): suspend () -> R = block
Copy the code

The suspend method is useful for creating a suspend function when the suspend keyword cannot be used directly in a normal function or lambda, and the lambda has no arguments.

Coroutine task Job

Element, inherited from Coroutinecontext. Element, is a context Element. Job represents the object of the current coroutine task, giving coroutines the ability to structure concurrency, lifecycle, and cancellability.

A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.

A background job. Conceptually, work is a cancelable thing with a life cycle culminating in completion.

public interface Job : CoroutineContext.Element {
    public val isActive: Boolean
    public val isCompleted: Boolean
    public val isCancelled: Boolean
    public fun start(a): Boolean
    public fun cancel(cause: CancellationException? = null)
    public suspend fun join(a)
}
Copy the code

Job stands for encapsulating the code logic that needs to be executed in a coroutine. The Thread class represents Thread object references in Java, and the Job class represents coroutine object references.

The state of the Job

Like Thread, a Job has several states to represent the state of a coroutine:

State IsActive (active or not) isCompleted isCancelled
New (Optional initial state) false false false
Active (default initial state) true false false
实 习 (实 习) true false false
Cancelling (transient state) false false true
Cancelled (perfect) false true true
Completed state false true false
  • New

    The startup option CoroutineStart is set to CoroutineStart. With LAZY, the state of the Job is New(created but not started). You can start the Job by calling the start or Join method.

  • Active

    The normal Job state is Active (created and started). However, if the launch option CoroutineStart is set to CoroutineStart.LAZY, the Job state is New(created but not started). The Job can be started by calling the start or Join methods.

    The state of the Job is Active while the coroutine is running until it is completed or until the coroutine fails or is cancelled.

  • Cancelling

    If an abnormal Job is thrown, the Job will enter the Cancelling state. You can also use the cancel method to cancel the Job at any time and immediately switch the Job to the Cancelling state.

    The state is Cancelling until the Job waits for all its children to cancel.

  • Cancelled

    The Job goes into Cancelled state when it cancels the subitems recursively and waits for all the subitems to cancel.

  • Completing

    As with Cancelling, this state is entered while waiting for all subitems to complete. Note that the state of completion is for the internal child, and the parent Job is still Active to an external observer.

  • Completed

    When all subentries are Completed, the Job changes to the Completed state.

Job status transfer:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+
Copy the code

There is a note about cancellations:

Where canceling the parent item causes an immediate recursive cancellation of all its children. The failure of a child with an exception other than CancellationException cancels its parent immediately, thus cancelling all its other children. This behavior can be customized using the SupervisorJob.

SupervisorJob

The SupervisorJob is the Job implementation class. It differs from Job in that its children can fail alone without causing the failure of the parent coroutine or any other child coroutine.

It is designed to be able to implement a custom rule failure handling mechanism:

  • Use [launch] [CoroutineScope launch] failed to create child Job can CoroutineExceptionHandler processing in context.
  • The failure of a child Job created with [async][coroutinescope.async] can be processed with Deferred. Await the generated delay value.

If the parent argument specifies a value, the Supervisor job becomes a child of the parent and is canceled when the parent fails or is canceled. All supervisor children are also cancelled. Calling the cancel method with an exception (other than CancellationException) also causes parent to cancel.

Cancellation mechanisms are a more complex topic to be shared separately.

Obtaining a Job Object

There are two ways to obtain a Job object:

  1. Get the Job object from the return value of the coroutine builder method.
  2. Created directly through constructors, which are often used in coroutine contexts.
val job = Job()

// constructor
@Suppress("FunctionName")
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)
Copy the code

Conceptually, val job = job () is the same as creating an empty block of code using launch:

val job = launch {
		// no opt
}
Copy the code

ContinuationInterceptor coroutine interceptor

ContinuationInterceptor is used to intercept coroutines before they actually run, defining interfaces in the interceptor mode.

Coroutine scheduler

In analyzing the difference between threads and coroutines, we mentioned the term “coroutine scheduler.” When we configure a coroutine with a scheduler, the thread on which the coroutine resides changes. The coroutine scheduler apparently hides the secret of thread switching.

The coroutine scheduler is also a coroutine context element:

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
  	// ...
  	public abstract fun dispatch(context: CoroutineContext, block: Runnable)
}
Copy the code

Dispatch is the key method of dispatching defined in CoroutineDispatcher. The most common implementation of CoroutineDispatcher is a singleton dispatcher:

public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
Copy the code

The function of Dispatchers is to group schedulers. In fact, CoroutineDispatcher has many implementation classes, such as EventLoop, CommonPool, etc., which are used in different groups.

Default

If the Dispatcher or any other ContinuationInterceptor is not specified in the context, all standard builders (e.g., launch, async, etc.) use CoroutineDispatcher by default.

It is supported by a shared thread pool on the JVM. By default, the maximum parallelism level used by this scheduler is equal to the number of CPU cores, but at least 2. Parallelism X guarantees that no more than X tasks can be executed in parallel in this scheduler.

    internal fun createDefaultDispatcher(a): CoroutineDispatcher =
    		if (useCoroutinesScheduler) DefaultScheduler else CommonPool
Copy the code

useCoroutinesScheduler

internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
    when (value) {
        null, "", "on" -> true
        "off" -> false
        else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
    }
}
Copy the code

The Default thread pool is useCoroutinesScheduler (true by Default) :

  • DefaultScheduler: Kotlin’s own thread pool logic implemented internally
  • CommonPool: Thread pool logic for Executor implementations in the Java class library

DefaultScheduler

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
  	// ...
}
Copy the code

DefaultScheduler defines an IO property that is an implementation of dispatchers. IO.

The other DefaultScheduler dispatch method in the superclass ExperimentalCoroutineDispatcher:

@InternalCoroutinesApi
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
		// ... 
    override val executor: Executor
        get() = coroutineScheduler

    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatch(context, block)
        }
  
    public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
    }
  
  	public fun limited(parallelism: Int): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
        return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
    }

    private fun createScheduler(a) = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
Copy the code

Distribution logic in CoroutineScheduler:

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int.@JvmField val maxPoolSize: Int.@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        val task = createTask(block, taskContext) 
        // Try to submit the task to the local queue and take action based on the result
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if(notAdded ! =null) {
            if(! addToGlobalQueue(notAdded)) {// The global queue is closed in the last step of close/close -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")}}valskipUnpark = tailDispatch && currentWorker ! =null
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
  	// ... 
}
Copy the code

The general logic is to first convert a block to a Task (actually a runnable) and then commit it to a local queue for execution.

CommonPool

internal object CommonPool : ExecutorCoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        try{ (pool ? : getOrCreatePoolSync()).execute(wrapTask(block)) }catch (e: RejectedExecutionException) {
            unTrackTask()
            // CommonPool only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.enqueue(block)
        }
    }
		// ...    
}
Copy the code

The implementation of CommonPool is an Executor in the Java class library that implements thread pool logic.

IO

public val IO: CoroutineDispatcher = DefaultScheduler.IO
Copy the code

The IO object of DefaultScheduler, which is directly used by IO groups, is a LimitingDispatcher object.

Its distribution is implemented as follows:

private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int.private valname: String? .override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {

    private val queue = ConcurrentLinkedQueue<Runnable>()

    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            // Submit a task in progress
            val inFlight = inFlightTasks.incrementAndGet()
            // Fast path, if the parallelism limit is not reached, the task is distributed and returned
            if (inFlight <= parallelism) {
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }
            // The task is added to the queue when the parallelism limit is reached
            queue.add(taskToSchedule)
            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return} taskToSchedule = queue.poll() ? :return}}// ... 
}
Copy the code

You can see that dispatchers. Default and IO run in the same thread, i.e. share the same thread pool. But IO has a thread limit. In CoroutineScheduler:

  1. CoroutineScheduler a maximum of corePoolSize threads can be created
  2. CorePoolSize takes the value Max (2, number of CPU cores)

In Limit Dispatcher:

Max (corePoolSize, min(number of CPU cores * 128, 2^ 21-2))

Main

You need to introduce the Kotlinx-Coroutines-Android library, which has the Android counterpart dispatchers. Main implementation.

    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(a): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                // Enable R8 optimization when compiling on Android
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            @Suppress("ConstantConditionIf")factories.maxByOrNull { it.loadPriority }? .tryCreateDispatcher(factories) ? : createMissingDispatcher() }catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }
Copy the code

Use the FAST_SERVICE_LOADER_ENABLED field to determine the implementation logic. FAST_SERVICE_LOADER_ENABLED defaults to true. The realization of the FastServiceLoader. LoadMainDispatcherFactory as follows:

    internal fun loadMainDispatcherFactory(a): List<MainDispatcherFactory> {
        val clz = MainDispatcherFactory::class.java
        if(! ANDROID_DETECTED) {return load(clz, clz.classLoader)
        }

        return try {
            val result = ArrayList<MainDispatcherFactory>(2)
            createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")? .apply { result.add(this) }
            createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")? .apply { result.add(this) }
            result
        } catch (e: Throwable) {
            // Fallback to the regular SL in case of any unexpected exception
            load(clz, clz.classLoader)
        }
    }
    
    internal val ANDROID_DETECTED = runCatching { Class.forName("android.os.Build") }.isSuccess
Copy the code

The official description of this method:

This method tries to load the MainDispatcherFactory in an Android-friendly way. If we are not on Android, this method is back to normal service load, otherwise, we attempt to AndroidDispatcherFactory and TestMainDispatcherFactory implementation Class class.forname lookup.

Therefore, dispatchers. Main is highly relevant to the Android platform. The underlying principle is that looper.getMainlooper () is called to get the handler, and the handler is finally implemented in the Main thread. Essentially, the task is run on the main thread of Android via Handler.

Unconfined

Unconfined means that the task is executed on the default boot thread. It is then up to the thread that calls resume to decide which thread to resume the coroutine:

public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

internal object Unconfined : CoroutineDispatcher() {
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // It can only be called by the "yield" function. See also code of "yield" function.
        val yieldContext = context[YieldContext]
        if(yieldContext ! =null) {
            // report to "yield" that it is an unconfined dispatcher and don't call "block.run()"
            yieldContext.dispatcherWasUnconfined = true
            return
        }
        throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
            "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
            "isDispatchNeeded and dispatch calls.")}}Copy the code

The logic of this method has to do with the DispatchedContinuation.

What is a DispatchedContinuation? First, there is a Continuation at the top of the coroutine inheritance relationship, which can represent a coroutine. Furthermore, the Continuation defines the ability of the coroutine to recover. DispatchedContinuation is an implementation of a Continuation and its resumeWith:

    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC_DEFAULT
            dispatcher.dispatch(context, this)}else {
            executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation.resumeWith(result)
                }
            }
        }
    }
Copy the code

Unconfined: execute execute executeUnconfined using isDispatchNeeded (dispatch, Unconfined=false, default, IO=true)

private inline fun DispatchedContinuation< * >.executeUnconfined(
    contState: Any? , mode:Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean {
    val eventLoop = ThreadLocalEventLoop.eventLoop
    // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
    return if (eventLoop.isUnconfinedLoopActive) {
        // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
        _state = contState
        resumeMode = mode
        eventLoop.dispatchUnconfined(this)
        true // queued into the active loop
    } else {
        // Was not active -- run event loop until all unconfined tasks are executed
        runUnconfinedEventLoop(eventLoop, block = block)
        false}}Copy the code

This method will extract the eventLoop (which is related to the current thread) from the Threadlocal and determine whether the Unconfined task is being executed

  1. If running, call the dispatchUnconfined method of EventLoop to place the Unconfined task in the EventLoop.

  2. If not, execute directly.

    internal inline fun DispatchedTask< * >.runUnconfinedEventLoop(
        eventLoop: EventLoop,
        block: () -> Unit
    ) {
        eventLoop.incrementUseCount(unconfined = true)
        try {
            block()
            while (true) {
                if(! eventLoop.processUnconfinedEvent())break}}catch (e: Throwable) {
            handleFatalException(e, null)}finally {
            eventLoop.decrementUseCount(unconfined = true)}}Copy the code

EventLoop is stored in Threadlocal and therefore associated with the current thread, and is a subclass of CoroutineDispatcher.

internal abstract class EventLoop : CoroutineDispatcher() {
		private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
		
		public fun dispatchUnconfined(task: DispatchedTask< * >) {
        valqueue = unconfinedQueue ? : ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } queue.addLast(task) }public fun processUnconfinedEvent(a): Boolean {
        valqueue = unconfinedQueue ? :return false
        valtask = queue.removeFirstOrNull() ? :return false
        task.run()
        return true}}Copy the code

The Unconfined task is stored internally via a two-way queue:

  1. EventLoopthedispatchUnconfinedThe Unconfined task () method is used to put the Unconfined task at the end of the queue
  2. processUnconfinedEventThe Unconfined task () method is used to remove the Unconfined task execution from the head of the queue

In the end

The basic composition of coroutines is almost introduced, and you may feel that there are still some concepts, such as what is suspended and so on, which will be explained to you one by one in a subsequent article.

In addition, for the use of coroutines, please refer to the official tutorial: [play.kotlinlang.org/hands-on/In play.kotlinlang.org/hands-on/In… to Coroutines and Channels / 01 _introduction)

This article has a lot of content, if there are mistakes and suggestions please correct.