Premise of learning
-
Java threads need to know roughly
-
A coroutine is a task executed by a thread. The difference between a coroutine and a user thread is that it is backed by a powerful compiler. A coroutine has a coroutine-specific scheduler and a bunch of handy functions that allow the user to execute the coroutine on the desired thread within a certain range
What is a coroutine?
A: I think coroutines are tasks, and the coroutine scheduler and state machine are responsible for scheduling these tasks for the thread to execute
Coroutines are similar to tasks in games, and threads are similar to characters in games
Why coroutines? Or what is the advantage of coroutines?
A: The advantage of coroutines is that
- Fine scheduling, one second is still using the UI thread to perform some UI update operations, the next second it is found that the task needs to block, the blocking task is cut to other threads to execute, the UI thread continues to refresh the UI
- Writing asynchronous code like writing regular code makes it easier to write highly concurrent code
- Coroutines are also suitable for IO operations if the underlying coroutine implements non-blocking IO functions that are part of the coroutine
- Coroutines reduce the number of threads, reducing the number of context switches required
Coroutines basis
Hello coroutines
// There may be a problem with the thread GlobalScope started and this annotation needs to be added to indicate it
@DelicateCoroutinesApi
fun main(a) {
// Create a global coroutine space whose declaration cycle will not be released until the end of app, so note that this function is more like new Thread
GlobalScope.launch {
The main thread will suspend the coroutine of the scope at this point, and then the main thread will exit the coroutine scope
delay(1000)
// Wait until the coroutine is suspended for 1 SEC to execute the following code
println("${Thread.currentThread()}: 1.") / / Thread [DefaultDispatcher - worker - 1, 5, the main] : 1
}
// When the main thread encounters the above suspend function, it jumps directly to here and prints
println("${Thread.currentThread()}: 2") // Thread[main,5,main]: 2
// The main thread waits for 2 SEC
Thread.sleep(2000)}Copy the code
Interpretation of the code above:
-
Delay is the suspension function, and thread.sleep (…) in the Thread. Same, but suspend functions can only be executed by coroutines, not normal threads, otherwise an error will be reported
-
Coroutines mission met hang function the thread does not block, but hang coroutines, the suspended function to the other thread execution (code above and not to other threads, so is the main thread in execution, but blocking function is not the main thread in the block, but with the aid of coroutines, where the code execution, stack, With coroutine mount, until the block ends, the coroutine resumes, and the thread returns to the place where the coroutine was suspended to continue executing the code of the coroutine scope.
GlobalScope has been pointed out by the Kotlin team as a problem in Kotlin 1.5, so be careful when using it. Is GlobalScope’s coroutine being used elsewhere, or is your coroutine being used by another person at the same time? GlobalScope’s coroutines live and die with your app, even if you don’t use them anymore, and GlobalScope’s coroutines can’t cancel, so be careful what happens if you use them
RunBlocking: Coroutine scope building
@DelicateCoroutinesApi
fun main(a) {
// this: CoroutineScope is the CoroutineScope
// Start a new coroutine in the background and continue
GlobalScope.launch {
delay(1000) // Non-blocking wait 1 second (default time in milliseconds)
println("${Thread.currentThread()}: 1.") // Print output after delay
}
println("${Thread.currentThread()}: 2") // The coroutine is waiting, and the main thread is still executing
// Another coroutine scope
runBlocking {
// Suspend the coroutine, but as a result of the runBlocking effect, the main thread must be released after all the child coroutines are executed, so the main thread is blocked at delay(2000).
delay(2000)}}Copy the code
runBlocking
A coroutine scope is applied, which blocks the main thread untilrunBlocking
All subcoroutines in the scope have finished executing, remember the child thread
Note: runBlocking blocks the main function, usually no one dares to use it, just for testing purposes, generally consider using the coroutineScope
Using Globalscope. launch in the following code is no longer a subcoroutine of runBlocking, so it is not restricted by runBlocking
In the source code you will find this:
@DelicateCoroutinesApi public object GlobalScope : CoroutineScope { /** * Returns [EmptyCoroutineContext]. */ override val coroutineContext: CoroutineContext get() = EmptyCoroutineContext } Copy the code
As you can see, GlobalScope has its own coroutine context and runBlocking has its own coroutine context. The two coroutine contexts are completely different, so runBlocking cannot block GlobalScope’s starting coroutine
@DelicateCoroutinesApi
// Apply a coroutine scope to block execution threads
fun main(a) = runBlocking {
Execute the following code using a global coroutine
GlobalScope.launch {
// Suspend function, suspend coroutine
delay(1000)
// After the suspension is complete, the thread begins to print the following
println("${Thread.currentThread()}: 1.")}// The main thread prints
println("${Thread.currentThread()}: 2")
// The main thread hangs and waits
delay(2000)}Copy the code
If delay(2000) is commented out, the main thread exits
If so:
fun main(a) = runBlocking {
launch {
delay(1000)
println("${Thread.currentThread()}: 1.")
}
println("${Thread.currentThread()}: 2")}Copy the code
The main thread waits for the launch function’s child thread to terminate
If you want globalscope.launch to be waited on by the main thread, you need to extract the Job that returns the value, then job.join().
Wait for the coroutine to end
@DelicateCoroutinesApi
// Since job.join() is a suspended function, the function that calls the suspended function is also a suspended function
suspend fun main(a) {
val job = GlobalScope.launch {
delay(1000)
println("${Thread.currentThread()}: 1.")
}
println("${Thread.currentThread()}: 2")
job.join()
}
Copy the code
Structured concurrency
In globalScope. launch, we found that a Thread is global and cannot be supervised, so we should use globalScope. launch as little as possible
fun main(a): Unit = runBlocking {
launch {
delay(1000)
println("${Thread.currentThread()}: 1.")
}
println("${Thread.currentThread()}: 2")}Copy the code
This allows the runBlocking above to control the internal subcoroutine until the subcoroutine terminates and the thread exits the coroutine scope
CoroutineScope: Scope build
In addition to the runBlocking above, Kotlin also provides coroutineScope for building coroutine scopes
CoroutineScope, like runBlocking, guarantees that a coroutineScope will not be closed until its subcoroutines have finished executing
suspend fun main(a) {
println("1")
coroutineScope {
delay(1000)
println("2")
}
println("3")}Copy the code
runBlocking
和 coroutineScope
The difference between
RunBlocking blocks the main thread in the coroutineScope, whereas coroutineScope does not. It is a suspended function that does not block the thread, but releases it for other purposes
If the main thread is the UI thread, you’re still using runBlocking, you’re screwed, the UI thread is blocked, your app is blocked. But I’m not learning Kotlin for Android so test yourself
Cancellation and timeout of coroutines
fun main(a) {
val scope = CoroutineScope(Dispatchers.Unconfined)
scope.launch {
repeat(100000) {
delay(1000)
println("ping")
}
}
TimeUnit.SECONDS.sleep(6)
scope.cancel()
}
Copy the code
Cancellation does not take effect if the coroutine performs CPU intensive computations
fun main(a): Unit = runBlocking {
val start = Instant.now()
val job = launch(Dispatchers.Default) {
repeat(5) {
if (Instant.now() >= start) {
println("job: I'm sleeping $it ...")
start.plusMillis(500)
}
}
}
delay(1300)
job.cancelAndJoin()
println("${Thread.currentThread()} is finished")}Copy the code
The way to terminate a coroutine in a computation
You can use isActive to stop coroutines in a calculation:
suspend fun main(a) = coroutineScope {
val start = Instant.now()
val job = launch(Dispatchers.Default) {
var i = 0
var next = start
while (isActive) {
if (Instant.now() >= next) {
println("job: I'm sleeping ${i++} ...")
next = next.plusMillis(500)
}
}
}
delay(1300)
job.cancelAndJoin()
println("${Thread.currentThread()} is finished")}Copy the code
Release resources in finally
fun main(a): Unit = runBlocking {
val job = launch {
try {
repeat(1000) {
println("${Thread.currentThread()}: job: I'm sleeping $it ...")
delay(500)}}finally {
println("${Thread.currentThread()}: I'm running finally")
}
}
delay(1300)
job.cancelAndJoin()
println("${Thread.currentThread()} is finished")}Copy the code
Thread[main,5,main]: job: I'm sleeping 0. Thread[main,5,main]: job: I'm sleeping 1. Thread[main,5,main]: job: I'm sleeping 2. Thread[main,5,main]: I'm running finally
Thread[main,5,main] is finished
Copy the code
A code block that cannot be cancelledNonCancellable
fun main(a) = runBlocking {
val job = launch {
try {
repeat(1000) {
println("${Thread.currentThread()}: I'm sleeping $it ...")
delay(500)}}finally {
withContext(NonCancellable) {
println("${Thread.currentThread()}: I'm running finally")
delay(1000L)
println("${Thread.currentThread()}: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300)
println("${Thread.currentThread()}: I'm tired of waiting!")
job.cancelAndJoin()
println("${Thread.currentThread()}: Now I can quit.")}Copy the code
Compose suspend functions
Async concurrent
suspend fun doSomethingUsefulOne(a): Int {
delay(1000)
return 13
}
suspend fun doSomethingUsefulTwo(a): Int {
delay(1000)
return 14
}
suspend fun main(a): Unit = coroutineScope {
val time1 = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println(one + two)
}
println("$time1 ms")
val time02 = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("${one.await() + two.await()}")
}
println("$time02 ms")}Copy the code
The above function synchronizes the delay of the two coroutine subtasks so that the delay ends after a maximum of one second, or two seconds if the execution is serial
Async differs from launch in that async can receive the return value of its internal scope
More than a
public suspend fun await(a): T
Copy the code
Function to accept the return value
Delay starting async
private suspend fun doSomethingUsefulOne(a): Int {
delay(1000)
return 13
}
private suspend fun doSomethingUsefulTwo(a): Int {
delay(1000)
return 14
}
fun main(a): Unit = runBlocking {
val time = measureTimeMillis {
// The coroutine runs late
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// The coroutine is officially launched here
one.start()
two.start()
// Wait for two coroutine tasks to complete
println("I answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")}Copy the code
Asynchronous style functions
@DelicateCoroutinesApi
fun somethingUsefulOneAsync(a) = GlobalScope.async {
doSomethingUsefulOne()
}
@DelicateCoroutinesApi
fun somethingUsefulTwoAsync(a) = GlobalScope.async {
doSomethingUsefulTwo()
}
@DelicateCoroutinesApi
suspend fun main(a) {
val time = measureTimeMillis {
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")}Copy the code
In Kotlin this style of function is not recommended. If an error occurs during val one = somethingUsefulOneAsync() to {one.await()}, the exception is thrown by the main thread, which will then do something else, But somethingUsefulOneAsync will still execute stealthily, and the previous exception does not terminate the operation
The above method is not recommended, so how do we use it? Answer: Asynchronous structured concurrency
Asynchronous structured concurrency
suspend fun concurrentSum(a) : Int = coroutineScope {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
one.await() + two.await()
}
suspend fun main(a) {
val measureTimeMillis = measureTimeMillis {
println(concurrentSum())
}
println("$measureTimeMillis ms")}Copy the code
In Kotlin, the above approach is recommended
The advantage of this is that all coroutines in the coroutineScope scope will be cancelled as soon as an exception is thrown in that scope
If you have an internal coroutine that cannot be terminated by an exception, consider using the previous ### coroutine withContext(NonCancellable)
suspend fun concurrentSum(a): Int = coroutineScope {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
launch {
withContext(NonCancellable) {
repeat(3) {
delay(1000)
println("delay 1 sec")
}
}
}
launch {
println("Throw runtime exception termination coroutine")
throw RuntimeException("Throw runtime exception termination coroutine")
}
one.await() + two.await()
}
suspend fun main(a) {
val measureTimeMillis = measureTimeMillis {
println(concurrentSum())
}
println("$measureTimeMillis ms")}Copy the code
The final output is:
Throw runtime exception terminating coroutine delay1 sec
delay 1 sec
delay 1 sec
Exception inThread "main" Java. Lang. RuntimeException: throw a runtime exception to terminate coroutinesCopy the code
The result is that the scope is not terminated immediately, but is thrown after a 3-second delay
Coroutine context scheduler
The context of the coroutine is the CoroutineContext which is responsible for storing intermediate elements during the scheduling of the coroutine, And he is the main elements of the Job, the scheduler (coroutine dispatcher), the name of coroutines (CoroutineName) and abnormal coroutines (CoroutineExceptionHandler), it can do addition subtraction combination
Scheduler and thread
The main function of the scheduler is to schedule the coroutine to be executed by a thread. Depending on which thread the scheduler decides, it can restrict the execution of the coroutine to a specific thread, dispatch it to a thread pool, or execute it without any restrictions
All coroutine builders launch or async have a parameter called CoroutineContext. The main purpose is to assign the type of scheduler to the builder
@ExperimentalCoroutinesApi
fun main(a): Unit = runBlocking {
// The preceding numbers are the order in which they are printed
launch { // Run in the context of the parent coroutine, the runBlocking main coroutine
// 3 main runBlocking : I'm working in thread main
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // unrestricted -- will work in the main thread
// 1 Unconfined : I'm working in thread main
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // The default scheduler DefaultDispatcher will be retrieved
// 2 Default : I'm working in thread DefaultDispatcher-worker-1
println("Default : I'm working in thread ${Thread.currentThread().name}")}val context = newSingleThreadContext("MyOwnThread")
launch(context) { // Will make it acquire a new thread, which is a proprietary thread and needs to be closed if the thread resource is not used anymore because it is too expensive
// 4 newSingleThreadContext: I'm working in thread MyOwnThread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")}//
context.close()
}
Copy the code
The print details are in the notes
Unconfined : I'm working in thread main
Default : I'm working in thread DefaultDispatcher-worker-1
main runBlocking : I'm working in thread main
newSingleThreadContext: I'm working in thread MyOwnThread
Copy the code
We analyze them in turn:
-
Launch {… }: Where the coroutine context (and the scheduler) inherits from the runBlocking context
-
Dispatchers.Unconfined: a special dispatcher that does not specify which thread the coroutine must be executed on, but accepts whatever thread it is executed on
-
Launch (dispatchers.default): Same thread pool as globalScope.launch
-
NewSingleThreadContext: With this method, a dedicated thread is created for the execution of the coroutine, but threads are an expensive resource, so if they are no longer used, the thread in this method needs to be closed or designed as a top-level property for the entire system to use
Unrestricted scheduler vs restricted scheduler
When the coroutine scheduler is dispatchers. Unconfined, the dispatcher creates a coroutine inside the caller’s thread for execution, so which thread is responsible for the execution of the coroutine
fun main() = runBlocking<Unit> {
launch(Dispatchers.Unconfined) {
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
delay(500)
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
launch {
println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}
}
Copy the code
Eventually he will print:
Unconfined : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main
Copy the code
See? Because the Unconfined thread changed after a delay, the control group did not change anything and the main thread was still running
Why did it change? Because coroutines in case of delay after the main thread is cleared, the rest function to be executed hang by a thread in the execution kotlinx. Coroutines. DefaultExecutor
There are many other ways to prove the above property:
fun main(a) = runBlocking<Unit> {
launch(Dispatchers.Unconfined) {
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {}
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
launch {
println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {}
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")}}Copy the code
Unconfined : I'm working in thread main
Unconfined : After delay in thread DefaultDispatcher-worker-1
main runBlocking: I'm working in thread main
main runBlocking: After delay in thread main
Copy the code
So is this example
suspend fun doSomething(a) {
withContext(Dispatchers.IO) {
println("IO : 2 ${Thread.currentThread().name}")}}fun main(a): Unit = runBlocking {
launch(Dispatchers.Unconfined) {
println("Unconfined : 1 ${Thread.currentThread().name}")
// delay(1000)
doSomething()
println("Unconfined : 3 ${Thread.currentThread().name}")}}Copy the code
Unconfined : 1 main
IO : 2 DefaultDispatcher-worker-1
Unconfined : 3 DefaultDispatcher-worker-1
Copy the code
So we can use this feature to test, you know, when you don’t know that the suspension function has finished and you don’t know what thread is executing, you can use this, right
Such as hang in front of the function of thread is kotlinx coroutines. Subsequent code or kotlinx. After DefaultExecutor coroutines. DefaultExecutor in execution
The suspension function is executed by the defaultDispatcher-worker-1 thread, and the code after the suspension function is executed by the defaultDispatcher-worker-1 thread
Debug coroutines and threads
Coroutines can be suspended on one thread and recovered on another, but the IDE is not smart enough to keep track of what the coroutine is doing at any given moment
Debug using the IDEA special plug-in
Using the Coroutine debugger, you can:
- Check the status of each Coroutine
- View the values of local and capture variables for running and pending Coroutines
- See the complete coroutine creation stack and the call stack inside the coroutine. The stack contains all stack frames with variables, even those that are lost during standard debugging
- Gets a complete report containing the state and stack information of each coroutine. To Get it, right-click on the Coroutines TAB and then click Get Coroutines Dump
Debug by printing logs
Print or log the information you want to print and that’s it
Kotlin for users to better use log debug, increased – Dkotlinx. Coroutines. The debug parameter
suspend fun doSomething(a) {
withContext(Dispatchers.IO) {
println("IO : 2 ${Thread.currentThread().name}")}}fun main(a): Unit = runBlocking {
println("main: 0 ${Thread.currentThread().name}")
launch(Dispatchers.Unconfined) {
println("Unconfined : 1 ${Thread.currentThread().name}")
doSomething()
println("Unconfined : 3 ${Thread.currentThread().name}")}}Copy the code
Before use:
main: 0 main
Unconfined: 1.main
IO : 2 DefaultDispatcher-worker- 1Unconfined : 3 DefaultDispatcher-worker- 1Copy the code
After use:
main: 0 main @coroutine# 1Unconfined: 1.main @coroutine# 2IO : 2 DefaultDispatcher-worker1 @coroutine# 2Unconfined : 3 DefaultDispatcher-worker1 @coroutine# 2Copy the code
You’ll notice that there are extra characters like @coroutine#1 and @coroutine#2. These are the coroutine identifiers, where @coroutine#1 is the main coroutine given by runBlocking, and @coroutine#2 is the child coroutine
Identifier only in debug mode can appear, in addition to – Dkotlinx. Coroutines. Debug, add – ea to VM parameters can also be to enter debug mode, you can also print out the name of coroutines
A coroutine jumps between threads
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
@ExperimentalCoroutinesApi
fun main(a) {
newSingleThreadContext("ctx1").use { ctx1 ->
newSingleThreadContext("ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")}}}}Copy the code
[ctx1 @coroutine#1] Started in ctx1
[ctx2 @coroutine#1] Working in ctx2
[ctx1 @coroutine#1] Back to ctx1
Copy the code
He shows that the same coroutine is executed between different threads, and uses use so that the thread applied by newSingleThreadContext doesn’t have to be closed manually
Name coroutines
fun main(a):Unit = runBlocking(CoroutineName("main")) {
log("1")
val v1 = async(CoroutineName("v1 coroutine")) {
delay(500)
log("2")
255
}
val v2 = async(CoroutineName("v2 coroutine")) {
delay(1000)
log("3")
7
}
log("4 result = ${v1.await() + v2.await()}")}Copy the code
Combine elements in context
fun main(a): Unit = runBlocking {
launch(Dispatchers.Default + CoroutineName("test")) {
log("1")}}Copy the code
Combine the context and change the coroutine name to test
[DefaultDispatcher-worker-1 @test#2] 1
Context inheritance of coroutines
If the new coroutine is created in another coroutine’s Coroutine, then we call the new coroutine child coroutine and the other coroutine parent coroutine. The context of the child coroutine will be inherited from the parent coroutine except for Job, which will be a child Job of the parent coroutine, but Job is new
fun main(a): Unit = runBlocking {
val scope = CoroutineScope(Dispatchers.IO + Job() + CoroutineName("test"))
val job = scope.launch {
log("1 ${coroutineContext[Job]}")
launch {
log("2 ${coroutineContext[Job]}")
}
}
job.join()
}
Copy the code
[DefaultDispatcher-worker-1 @test#2] 1 "test#2":StandaloneCoroutine{Active}@3eb7ab0f
[DefaultDispatcher-worker-3 @test#3] 2 "test#3":StandaloneCoroutine{Active}@33bb030c
Copy the code
Thread-local data
valthreadLocal: ThreadLocal<String? > = ThreadLocal<String? > ()fun main(a): Unit = runBlocking {
threadLocal.set("main")
log("1 ${threadLocal.get()}")
val job = launch(Dispatchers.Default + threadLocal.asContextElement("launch")) {
log("2 ${threadLocal.get()}")
yield()
log("3 ${threadLocal.get()}")
}
job.join()
log("4 ${threadLocal.get()}")}Copy the code
[main @coroutine#1] 1 main
[DefaultDispatcher-worker-1 @coroutine#2] 2 launch
[DefaultDispatcher-worker-1 @coroutine#2] 3 launch
[main @coroutine#1] 4 main
Copy the code
ThreadLocal is stored in an extra coroutine context, which is restored when the coroutine is context-switched, regardless of whether it is already a different coroutine
The new problem is that the coroutine can’t tell if ThreadLocal is still available and accessible to the coroutine, so Kotlin provides a method
threadLocal.ensurePresent()
Copy the code
Checks if ThreadLocal exists in the current coroutine context and throws an IllegalStateException if it does not
ThreadLocal also has a key limitation: when a thread-local variable changes (not coroutine changes, note), the new value is not propagated to the coroutine caller (because the context element cannot track all ThreadLocal object access), and the updated value is lost the next time it is suspended