Coroutines and threads

Objective differences

  • The purpose of threading is to increase CPU utilization, to allow multiple tasks to run in parallel, to serve the machine.
  • The purpose of coroutines is to allow better collaboration between multiple tasks, mainly in code logic, and to serve developers (to improve resource utilization, but not the original purpose).

Scheduling differences

  • Thread scheduling is done by the system, usually preemptively, and assigned according to priority
  • The scheduling of coroutines is specified by the developer according to the program logic, allocating resources reasonably to different tasks at different times.

Coroutines in relation to threads

Instead of replacing threads, coroutines are abstracted from threads. Threads are CPU resources that are partitioned. Coroutines are organized code flows that require threads to run

The core competitiveness of coroutine

Simplify asynchronous concurrent tasks.

CoroutineContext CoroutineContext

  • Coroutines always run in contexts represented by types of CoroutineContext, which is a collection of various elements
  • Elements in sets are mapped by key (Map), but are not allowed to duplicate (Set)
  • Elements can be combined with a + sign
  • There are four types of elements that make up a CoroutineContext
    • Job: unique identifier of the coroutine, used to control the life cycle of the coroutine (new, active, completing, completed, cancelling, cancelled)
    • CoroutineDispatcher: Specifies the thread to run the coroutine (IO, Default, Main, Unconfined)
    • CoroutineName: Specifies the name of the coroutine. The default is coroutine
    • CoroutineExceptionHandler: specifies the exception handler coroutines, used to handle an uncaught exception

Their relationship is shown below:

CoroutineDispatcher role

  • The running thread used to specify the coroutine
  • Kotlin has built in four implementations of CoroutineDispatcher, namely the Default, IO, Main and Unconfined fields of Dispatchers

public actual object Dispatchers {

    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
    
    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}

Copy the code

Dispatchers.Default

Default Obtains the corresponding thread pool based on the useCoroutinesScheduler attribute (Default: true)

  • DefaultScheduler: Kotlin’s own thread pool logic implemented internally
  • CommonPool: Thread pool logic for Executor implementations in the Java class library
internal actual fun createDefaultDispatcher(a): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    .....
}

open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    ......
}
// Executors in Java libraries implement thread pool logic
internal object CommonPool : ExecutorCoroutineDispatcher() {}

Copy the code

What if you want to use a thread pool in the Java class library? Change useCoroutinesScheduler to false

internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"

internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
    when (value) {
        null.""."on" -> true
        "off" -> false
        else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")}}internal actual fun systemProp(
    propertyName: String
): String? =
    try {
       // Get system properties
        System.getProperty(propertyName)
    } catch (e: SecurityException) {
        null
    }

Copy the code

It can be seen from the source code, the use of system attributes to get the value, then we can change the value of useCoroutinesScheduler by modifying the system attributes, the specific modification method is

 val properties = Properties()
 properties["kotlinx.coroutines.scheduler"] = "off"
 System.setProperties(properties)
Copy the code

The main implementation DefaultScheduler in ExperimentalCoroutineDispatcher father classes

open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
    
    override val executor: Executor
       get() = coroutineScheduler

    private var coroutineScheduler = createScheduler()
    
    // Create CoroutineScheduler instance
    private fun createScheduler(a) = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    override val executor: Executorget() = coroutineScheduler

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            // Dispatch method delegate to the Dispatch method of CoroutineScheduler
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            ....
        }

    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
        try {
            // The dispatchYield method is delegated to CoroutineScheduler
            coroutineScheduler.dispatch(block, tailDispatch = true)}catch (e: RejectedExecutionException) {
            ...
        }
    
	internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
        try {
            //dispatchWithContext delegates to CoroutineScheduler's dispatchWithContext method
            coroutineScheduler.dispatch(block, context, tailDispatch)
        } catch(e: RejectedExecutionException) { .... }}override fun close(a): Unit = coroutineScheduler.close()
    // Implement request blocking
    public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
    }
	// Implement concurrency limit
    public fun limited(parallelism: Int): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
        return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
    }
    
    ....
}

Copy the code

The request limit is implemented by calling the LimitingDispatcher class, whose class implementation is

private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int.private valname: String? .override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
    // Synchronize blocking queues
    private val queue = ConcurrentLinkedQueue<Runnable>()
    / / cas count
    private val inFlightTasks = atomic(0)
    
    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {

            if (inFlight <= parallelism) {
                // The LimitingDispatcher dispatcher method delegates to DefaultScheduler's dispatchWithContext method
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return}... }}}Copy the code

Dispatchers.IO

Let’s take a look at the definition of dispatchers. IO

    /** *This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — * typically execution continues in the same thread. */
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
    
    
    Internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
    
    ......
    
    }
    
Copy the code

The IO implementation in DefaultScheduler calls the blacking() method, which is ultimately implemented as the LimitingDispatcher class, Dispatchers.Default and IO are run in the same thread, i.e. share the same thread pool.

Both Default and IO share the CoroutineScheduler thread pool. Kotlin internally implements a set of thread pool and two scheduling policies, which are distinguished by Mode in the dispatch method

Type Mode
Default NON_BLOCKING
IO PROBABLY_BLOCKING
internal enum class TaskMode {

    // Perform CPU-intensive tasks
    NON_BLOCKING,

    // Perform IO intensive tasks
    PROBABLY_BLOCKING,
}
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false){...if (task.mode == TaskMode.NON_BLOCKING) {
            signalCpuWork() //Dispatchers.Default
     } else {
            signalBlockingWork() // Dispatchers.IO}}Copy the code
Type Handling strategy Suitable for the scene The characteristics of
Default CoroutineScheduler maximum number of corePoolSize threads can be created. CorePoolSize takes the value Max (2, number of CPU cores), i.e., it will try to equal the number of CPU cores Complex computing, video decoding, etc 1. Cpu-intensive tasks consume a lot of CPU resources. 2. Because threads themselves have stacks and other Spaces, and there are too many threads, the consumption caused by frequent thread switching will also affect the performance of the thread pool 4. For CPU-intensive tasks, the number of concurrent threads in the thread pool equals the number of CPU cores to maximize CPU execution efficiency
IO Max (corePoolSize, min(number of CPU cores * 128, 2^ 21-2)) Network requests and I/O operations 1. The CPU is idle during I/O intensive task execution, and the task does not consume large CPU resources. 2. Most threads are in the blocking state when they execute IO-intensive tasks. Threads in the blocking state do not occupy CPU execution time. The maximum number of threads parallelism is set to Max (64, number of CPU cores), and the rest of the tasks are queued to wait.

Dispatchers.Unconfined

Tasks are executed on the default startup thread. It is then up to the thread calling resume to decide which thread to resume the coroutine

internal object Unconfined : CoroutineDispatcher() {
    // If false, no dispatch is required
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // The Unconfined Dispatch method will only be invoked when the yield method is called
        // yield() indicates that the current coroutine gives up its thread to other coroutines
        val yieldContext = context[YieldContext]
        if(yieldContext ! =null) {
            yieldContext.dispatcherWasUnconfined = true
            return
        }
        throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
            "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
            "isDispatchNeeded and dispatch calls.")}}Copy the code

Each coroutine has a corresponding instance of a Continuation, where resumeWith is used for coroutine recovery and exists in a DispatchedContinuation

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    ......
    
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
        
    ......
    
}
Copy the code

Focus on the implementation of resumeWith and the class delegate

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>// Continuation generated by coroutine suspend suspend method
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
    .....
    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.dispatch(context, this)}else {
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation.resumeWith(result)
                }
            }
        }
    }
    ....
}

Copy the code

Use isDispatchNeeded (dispatch, Unconfined=false, default, IO=true) to determine whether to perform different operations

  • True: calls the dispatch method of the CoroutineDispatcher
  • False: Call the executeUnconfined method
private inline fun DispatchedContinuation< * >.executeUnconfined(
    contState: Any? , mode:Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean{ assert { mode ! = MODE_UNINITIALIZED }val eventLoop = ThreadLocalEventLoop.eventLoop
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
        eventLoop.dispatchUnconfined(this)
        true
    } else {
        runUnconfinedEventLoop(eventLoop, block = block)
        false}}Copy the code

Remove the eventLoop from the ThreadLocal and determine whether the Unconfined task is being executed

  1. If running, call the dispatchUnconfined method of EventLoop to place the Unconfined task in the EventLoop
  2. If not, execute directly
internal inline fun DispatchedTask< * >.runUnconfinedEventLoop(
    eventLoop: EventLoop,
    block: () -> Unit
) {
    eventLoop.incrementUseCount(unconfined = true)
    try {
        block()
        while (true) {
            if(! eventLoop.processUnconfinedEvent())break}}catch (e: Throwable) {
        handleFatalException(e, null)}finally {
        eventLoop.decrementUseCount(unconfined = true)}}Copy the code
  1. Execute the block() code block, resumeWith() mentioned above
  2. Call the processUnconfinedEvent() method to implement the remaining Unconfined tasks and jump out of the loop until they are all executed

EventLoop is stored with threadLocal and therefore associated with the current thread, and is also a subclass of CoroutineDispatcher

internal abstract class EventLoop : CoroutineDispatcher() {...// Implement a dual-end queue to store Unconfined tasks
    private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
    // Remove the Unconfined task from the head of the queue
    public fun processUnconfinedEvent(a): Boolean {
        valqueue = unconfinedQueue ? :return false
        valtask = queue.removeFirstOrNull() ? :return false
        task.run()
        return true
    }
    // Put the Unconfined task at the end of the queue
    public fun dispatchUnconfined(task: DispatchedTask< * >) {
        valqueue = unconfinedQueue ? : ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } queue.addLast(task) } ..... }Copy the code

Internally, the Unconfined task is stored in a dual-end queue

  1. EventLoop’s dispatchUnconfined method is used to put the Unconfined task at the end of the queue
  2. The rocessUnconfinedEvent method is used to remove the Unconfined task execution from the head of the queue

Dispatchers.Main

Kotlin’s implementation of Android on the JVM requires the introduction of the Kotlinx-Coroutines-Android library, which contains the Corresponding Implementation of Dispatchers.Main for Android,

   public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
   
     @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(a): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                // We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader ).iterator().asSequence().toList() } factories.maxBy { it.loadPriority }? .tryCreateDispatcher(factories) ? : MissingMainCoroutineDispatcher(null)}catch (e: Throwable) {
            // Service loader can throw an exception as well
            MissingMainCoroutineDispatcher(e)
        }
    }
    
    internal fun loadMainDispatcherFactory(a): List<MainDispatcherFactory> {
        val clz = MainDispatcherFactory::class.java
        if(! ANDROID_DETECTED) {return load(clz, clz.classLoader)
        }

        return try {
            val result = ArrayList<MainDispatcherFactory>(2)
            createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")? .apply { result.add(this) }
            createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")? .apply { result.add(this) }
            result
        } catch (e: Throwable) {
            // Fallback to the regular SL in case of any unexpected exception
            load(clz, clz.classLoader)
        }
    }
Copy the code

Get the AndroidDispatcherFactory by reflection and then create the Dispatcher based on the loaded priority

internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")

    override fun hintOnError(a): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}
internal class HandlerContext private constructor(
    private val handler: Handler,
    private valname: String? .private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
   
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)...override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }

    ......
}
Copy the code

CreateDispatcher calls the HandlerContext class by calling Looper.getMainLooper() to retrieve the handler, which is eventually run on the main thread

Dispatchers.Main is just a way to run tasks on the Main thread of Android via a Handler