Coroutines and threads
Objective differences
- The purpose of threading is to increase CPU utilization, to allow multiple tasks to run in parallel, to serve the machine.
- The purpose of coroutines is to allow better collaboration between multiple tasks, mainly in code logic, and to serve developers (to improve resource utilization, but not the original purpose).
Scheduling differences
- Thread scheduling is done by the system, usually preemptively, and assigned according to priority
- The scheduling of coroutines is specified by the developer according to the program logic, allocating resources reasonably to different tasks at different times.
Coroutines in relation to threads
Instead of replacing threads, coroutines are abstracted from threads. Threads are CPU resources that are partitioned. Coroutines are organized code flows that require threads to run
The core competitiveness of coroutine
Simplify asynchronous concurrent tasks.
CoroutineContext CoroutineContext
- Coroutines always run in contexts represented by types of CoroutineContext, which is a collection of various elements
- Elements in sets are mapped by key (Map), but are not allowed to duplicate (Set)
- Elements can be combined with a + sign
- There are four types of elements that make up a CoroutineContext
- Job: unique identifier of the coroutine, used to control the life cycle of the coroutine (new, active, completing, completed, cancelling, cancelled)
- CoroutineDispatcher: Specifies the thread to run the coroutine (IO, Default, Main, Unconfined)
- CoroutineName: Specifies the name of the coroutine. The default is coroutine
- CoroutineExceptionHandler: specifies the exception handler coroutines, used to handle an uncaught exception
Their relationship is shown below:
CoroutineDispatcher role
- The running thread used to specify the coroutine
- Kotlin has built in four implementations of CoroutineDispatcher, namely the Default, IO, Main and Unconfined fields of Dispatchers
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}
Copy the code
Dispatchers.Default
Default Obtains the corresponding thread pool based on the useCoroutinesScheduler attribute (Default: true)
- DefaultScheduler: Kotlin’s own thread pool logic implemented internally
- CommonPool: Thread pool logic for Executor implementations in the Java class library
internal actual fun createDefaultDispatcher(a): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
.....
}
open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
......
}
// Executors in Java libraries implement thread pool logic
internal object CommonPool : ExecutorCoroutineDispatcher() {}
Copy the code
What if you want to use a thread pool in the Java class library? Change useCoroutinesScheduler to false
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null.""."on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")}}internal actual fun systemProp(
propertyName: String
): String? =
try {
// Get system properties
System.getProperty(propertyName)
} catch (e: SecurityException) {
null
}
Copy the code
It can be seen from the source code, the use of system attributes to get the value, then we can change the value of useCoroutinesScheduler by modifying the system attributes, the specific modification method is
val properties = Properties()
properties["kotlinx.coroutines.scheduler"] = "off"
System.setProperties(properties)
Copy the code
The main implementation DefaultScheduler in ExperimentalCoroutineDispatcher father classes
open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
// Create CoroutineScheduler instance
private fun createScheduler(a) = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override val executor: Executorget() = coroutineScheduler
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
// Dispatch method delegate to the Dispatch method of CoroutineScheduler
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
....
}
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
try {
// The dispatchYield method is delegated to CoroutineScheduler
coroutineScheduler.dispatch(block, tailDispatch = true)}catch (e: RejectedExecutionException) {
...
}
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
try {
//dispatchWithContext delegates to CoroutineScheduler's dispatchWithContext method
coroutineScheduler.dispatch(block, context, tailDispatch)
} catch(e: RejectedExecutionException) { .... }}override fun close(a): Unit = coroutineScheduler.close()
// Implement request blocking
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
}
// Implement concurrency limit
public fun limited(parallelism: Int): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
}
....
}
Copy the code
The request limit is implemented by calling the LimitingDispatcher class, whose class implementation is
private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
private val parallelism: Int.private valname: String? .override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
// Synchronize blocking queues
private val queue = ConcurrentLinkedQueue<Runnable>()
/ / cas count
private val inFlightTasks = atomic(0)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
if (inFlight <= parallelism) {
// The LimitingDispatcher dispatcher method delegates to DefaultScheduler's dispatchWithContext method
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return}... }}}Copy the code
Dispatchers.IO
Let’s take a look at the definition of dispatchers. IO
/** *This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — * typically execution continues in the same thread. */
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
Internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
......
}
Copy the code
The IO implementation in DefaultScheduler calls the blacking() method, which is ultimately implemented as the LimitingDispatcher class, Dispatchers.Default and IO are run in the same thread, i.e. share the same thread pool.
Both Default and IO share the CoroutineScheduler thread pool. Kotlin internally implements a set of thread pool and two scheduling policies, which are distinguished by Mode in the dispatch method
Type | Mode |
---|---|
Default | NON_BLOCKING |
IO | PROBABLY_BLOCKING |
internal enum class TaskMode {
// Perform CPU-intensive tasks
NON_BLOCKING,
// Perform IO intensive tasks
PROBABLY_BLOCKING,
}
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false){...if (task.mode == TaskMode.NON_BLOCKING) {
signalCpuWork() //Dispatchers.Default
} else {
signalBlockingWork() // Dispatchers.IO}}Copy the code
Type | Handling strategy | Suitable for the scene | The characteristics of |
---|---|---|---|
Default | CoroutineScheduler maximum number of corePoolSize threads can be created. CorePoolSize takes the value Max (2, number of CPU cores), i.e., it will try to equal the number of CPU cores | Complex computing, video decoding, etc | 1. Cpu-intensive tasks consume a lot of CPU resources. 2. Because threads themselves have stacks and other Spaces, and there are too many threads, the consumption caused by frequent thread switching will also affect the performance of the thread pool 4. For CPU-intensive tasks, the number of concurrent threads in the thread pool equals the number of CPU cores to maximize CPU execution efficiency |
IO | Max (corePoolSize, min(number of CPU cores * 128, 2^ 21-2)) | Network requests and I/O operations | 1. The CPU is idle during I/O intensive task execution, and the task does not consume large CPU resources. 2. Most threads are in the blocking state when they execute IO-intensive tasks. Threads in the blocking state do not occupy CPU execution time. The maximum number of threads parallelism is set to Max (64, number of CPU cores), and the rest of the tasks are queued to wait. |
Dispatchers.Unconfined
Tasks are executed on the default startup thread. It is then up to the thread calling resume to decide which thread to resume the coroutine
internal object Unconfined : CoroutineDispatcher() {
// If false, no dispatch is required
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) {
// The Unconfined Dispatch method will only be invoked when the yield method is called
// yield() indicates that the current coroutine gives up its thread to other coroutines
val yieldContext = context[YieldContext]
if(yieldContext ! =null) {
yieldContext.dispatcherWasUnconfined = true
return
}
throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
"If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
"isDispatchNeeded and dispatch calls.")}}Copy the code
Each coroutine has a corresponding instance of a Continuation, where resumeWith is used for coroutine recovery and exists in a DispatchedContinuation
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
......
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
......
}
Copy the code
Focus on the implementation of resumeWith and the class delegate
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>// Continuation generated by coroutine suspend suspend method
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
.....
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)}else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
....
}
Copy the code
Use isDispatchNeeded (dispatch, Unconfined=false, default, IO=true) to determine whether to perform different operations
- True: calls the dispatch method of the CoroutineDispatcher
- False: Call the executeUnconfined method
private inline fun DispatchedContinuation< * >.executeUnconfined(
contState: Any? , mode:Int, doYield: Boolean = false,
block: () -> Unit
): Boolean{ assert { mode ! = MODE_UNINITIALIZED }val eventLoop = ThreadLocalEventLoop.eventLoop
if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
return if (eventLoop.isUnconfinedLoopActive) {
_state = contState
resumeMode = mode
eventLoop.dispatchUnconfined(this)
true
} else {
runUnconfinedEventLoop(eventLoop, block = block)
false}}Copy the code
Remove the eventLoop from the ThreadLocal and determine whether the Unconfined task is being executed
- If running, call the dispatchUnconfined method of EventLoop to place the Unconfined task in the EventLoop
- If not, execute directly
internal inline fun DispatchedTask< * >.runUnconfinedEventLoop(
eventLoop: EventLoop,
block: () -> Unit
) {
eventLoop.incrementUseCount(unconfined = true)
try {
block()
while (true) {
if(! eventLoop.processUnconfinedEvent())break}}catch (e: Throwable) {
handleFatalException(e, null)}finally {
eventLoop.decrementUseCount(unconfined = true)}}Copy the code
- Execute the block() code block, resumeWith() mentioned above
- Call the processUnconfinedEvent() method to implement the remaining Unconfined tasks and jump out of the loop until they are all executed
EventLoop is stored with threadLocal and therefore associated with the current thread, and is also a subclass of CoroutineDispatcher
internal abstract class EventLoop : CoroutineDispatcher() {...// Implement a dual-end queue to store Unconfined tasks
private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
// Remove the Unconfined task from the head of the queue
public fun processUnconfinedEvent(a): Boolean {
valqueue = unconfinedQueue ? :return false
valtask = queue.removeFirstOrNull() ? :return false
task.run()
return true
}
// Put the Unconfined task at the end of the queue
public fun dispatchUnconfined(task: DispatchedTask< * >) {
valqueue = unconfinedQueue ? : ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } queue.addLast(task) } ..... }Copy the code
Internally, the Unconfined task is stored in a dual-end queue
- EventLoop’s dispatchUnconfined method is used to put the Unconfined task at the end of the queue
- The rocessUnconfinedEvent method is used to remove the Unconfined task execution from the head of the queue
Dispatchers.Main
Kotlin’s implementation of Android on the JVM requires the introduction of the Kotlinx-Coroutines-Android library, which contains the Corresponding Implementation of Dispatchers.Main for Android,
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(a): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader ).iterator().asSequence().toList() } factories.maxBy { it.loadPriority }? .tryCreateDispatcher(factories) ? : MissingMainCoroutineDispatcher(null)}catch (e: Throwable) {
// Service loader can throw an exception as well
MissingMainCoroutineDispatcher(e)
}
}
internal fun loadMainDispatcherFactory(a): List<MainDispatcherFactory> {
val clz = MainDispatcherFactory::class.java
if(! ANDROID_DETECTED) {return load(clz, clz.classLoader)
}
return try {
val result = ArrayList<MainDispatcherFactory>(2)
createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")? .apply { result.add(this) }
createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")? .apply { result.add(this) }
result
} catch (e: Throwable) {
// Fallback to the regular SL in case of any unexpected exception
load(clz, clz.classLoader)
}
}
Copy the code
Get the AndroidDispatcherFactory by reflection and then create the Dispatcher based on the loaded priority
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
override fun hintOnError(a): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
internal class HandlerContext private constructor(
private val handler: Handler,
private valname: String? .private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)...override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
......
}
Copy the code
CreateDispatcher calls the HandlerContext class by calling Looper.getMainLooper() to retrieve the handler, which is eventually run on the main thread
Dispatchers.Main is just a way to run tasks on the Main thread of Android via a Handler