Learn kotlin coroutines in five minutes
Coroutines are lightweight threads, but they are thread free and can suspend execution in one thread and resume execution in another.
Suspend functions: Suspend functions are decorated with the suspend keyword. (Constructors, attributes S /g, delegates, anonymous functions cannot be marked as pending functions)
Suspended functions can be suspended and resumed at a later time. Only suspended functions can call suspended functions, which results in normal functions requiring a generated suspend context (coroutine scope) to run suspended functions. When the suspend function returns, its work has been processed.
The Kotlin compiler converts each suspended function into a state machine, using callbacks and optimizing each time the function needs to be suspended.
Coroutine scope: Coroutines need to run in a specific scope. Coroutine scope can manage the life cycle of coroutines without coroutine leakage. The coroutine scope is cancelled, and so are coroutines in the coroutine scope. Each coroutine scope has a parent object. CoroutineContext = parent CoroutineContext + Job(),
The system provides several ways to create coroutine scopes, such as
- coroutineScope{… } can inherit the parent coroutine scope.
- runBlocking{… } runs a new coroutine and blocks the current thread until the coroutine completes.
- GlobalScope Global coroutine scope, no job binding, top-level coroutine scope, as long as the application life cycle. Apply to accompany the app lifecycle of top background process, and need to show to join @ OptIn (DelicateCoroutinesApi: : class) this annotation
- supervisorScope {… } can inherit the scope of the parent coroutine, internal cancellations and exceptions can only be passed from the parent coroutine to the child coroutine, the child coroutine cancellations and exceptions cannot affect the coroutine outside the child coroutine.
- Create val scope = CoroutineScope(Job() + dispatchers.main)
Two ways to start coroutines
- launch{… } starts a new coroutine without returning the result to the caller.
GlobalScope.launch {
println("hello coroutine")}Copy the code
- Async and await can get ctrip’s return result
var result = GlobalScope.async {
println("hello")
1
}
print("The result is${result.await()}")
Copy the code
How to cancel the determination of coroutine running state
Each coroutine creation returns a job class, job: the unique identifier of the coroutine that manages the declaration cycle of the coroutine.
To launch {… } isActive, isCompleted and isCancelled of the job class are returned to determine the running status of the coroutine
The cancel() and cancelAandJoin() methods can take out coroutines
async{… } The way to judge the running state of the started coroutine is the same as above.
fun main(a) = runBlocking {
val job = launch{
for(i in 1.10.){
delay(200L)
println(i)
println("isActive $isActive")
}
}
delay(2000L)
// job.cancel() retrieves the cancel procedure
job.cancelAndJoin() // execute the program
println("isComplete ${job.isCompleted}")
println("isCancelled ${job.isCancelled}")}fun main(a) = runBlocking {
var result = async {
delay(2000L)
1+1
}
println("isActive ${result.isActive}")
println("isComplete ${result.isCompleted}")
result.cancel("Cancel".null)// Get the coroutine by throwing a CancellationException
var sum = result.await();
println("isComplete ${result.isCompleted}")
println("isCancelled ${result.isCancelled}")
print("sum = $sum")}Copy the code
CoroutineDispatcher: the dispatcher is an extension of CoroutineDispatcher
- Dispatchers.Default Default scheduler, shared background thread pool, suitable for computation-intensive coroutines.
- Dispatchers.IO is suitable for IO intensive blocking operations.
- Dispatchers.Unconfined starts coroutine execution in the current call frame until the first pause, after which the coroutine builder function returns. The coroutine is later recovered in any thread used by the corresponding suspended function, without being restricted to any particular thread or pool. The Unconfined dispatcher should not normally be used in code.
- Can use newSingleThreadContext and newFixedThreadPoolContext create private thread pool.
- You can use the asCoroutineDispatcher extension function to convert any Executor into a scheduler.
A common way to use a scheduler
withContext(..) , async (…). , CoroutineScope (…). , launch (…). , runBlocking (…).
Coroutines can be switched with withContext()
fun main(a) = runBlocking {
println("0"+Thread.currentThread().name)
withContext(Dispatchers.Default){
println("1"+Thread.currentThread().name)
}
withContext(Dispatchers.IO){
println("2"+Thread.currentThread().name)
}
val dispather = newSingleThreadContext("my dispathcer")
withContext(dispather){
println("3"+Thread.currentThread().name)
}
dispather.close()
}
Copy the code
Print the result
0 main
1 DefaultDispatcher-worker-1
2 DefaultDispatcher-worker-1
3 my dispathcer
Copy the code
Exception handling of coroutines
Normal exception handling: When a coroutine fails due to an exception, it propagates the exception and passes it to its parent, which then does the following:
- Cancels its own children;
- Cancel itself;
- Propagates the exception and passes it to its parent. The exception will reach the root of the hierarchy, and any coroutines currently started by the CoroutineScope will be cancelled.
When it is used in the container, the failure of one child coroutine will not affect other children. The container is designed to handle exceptions on its own instead of canceling it and its children or propagating exceptions to its parent. It is designed to let the child coroutine handle exceptions on its own.
Scope controls a level of coroutines in my application
val scope = CoroutineScope(Job())
scope.launch {
supervisorScope {
launch { // Child 1 }
launch { // Child 2 }}}Copy the code
Only use supervisorScope or CoroutineScope (SupervisorJob ()) to create SupervisorJob, it will work as described above. Passing the container as a parameter to a coroutine Builder is not going to produce the effect you want.
With launch, an exception is thrown the first time it occurs, so you can wrap the code that threw the exception in a try/catch, as in the following example:
scope.launch {
try {
codeThatCanThrowExceptions()
} catch(e: Exception) { // Handle exception}
}
Copy the code
Exceptions are not raised automatically when async is used as a root coroutine (a direct child of the CoroutineScope instance or container scope), but only when.await() is called.
To catch exceptions thrown when async is the root coroutine, you can wrap the code calling.await() with a try/catch:
supervisorScope {
val deferred = async {
codeThatCanThrowExceptions()
}
try {
deferred.await()
} catch(e: Exception) {
// Handle exceptions thrown in async}
}
Copy the code
Exceptions generated in coroutines created by other coroutines are always propagated, for example:
val scope = CoroutineScope(Job())
scope.launch {
async {
// If async throws an exception, launch immediately throws an exception without calling.await()}}Copy the code
CoroutineExceptionHandler CoroutineContext is an optional element, it allows you to handle an uncaught exception.
val handler = CoroutineExceptionHandler { context, exception -> println("Caught $exception")}
val scope = CoroutineScope(Job())
scope.launch(handler) {
launch {
throw Exception("Failed coroutine")}}Copy the code
CoroutineExceptionHandler need correct delivery to CoroutineContext object, otherwise can’t catch exceptions.
Flow
Flow is a data source, primarily used to send data sources to consumers. The producer executes when there is a new listener, and the lifecycle of the data flow is handled automatically.
Common usage of the flow operator
fun main(a) = runBlocking<Unit> {
flow {
for (i in 1.3.) {
delay(100)
println(${thread.currentThread ().name} => $I)
emit(i)
}
}.flowOn(Dispatchers.IO)
.collect { println(${currentThread()} => $it)}}Copy the code
The running results are as follows:
Send defaultDispatcher-worker-1 => 1 Receive main => 1 Send DefaultDispatcher-worker-1 => 2 Receive main => 2 Send Defaultdispatcher-worker-1 => 3 Receiving main => 3Copy the code
The Flow intermediate operator, which can be used only when needed,
-
Map () converts one type to another.
-
Tarnsform (), which can send any string, any number.
-
Take () takes only the first few of the data stream.
-
Buffer () Buffer.
-
Conflate () merges emissions and does not treat each one.
-
Conbine ().
-
Zip () merges the two streams.
-
FlatMap () each data is tiled into a stream.
-
FlowOn () switches the emission context of the flow.
Exception handling of flow: This can be handled using a try catch. OnCompletion cannot catch exceptions and can only be used to determine if there are exceptions.
The catch operator catches exceptions from upstream
The use and function of channel
A channel is a concurrent primitive for interprocess communication. If the BroadcastChannel and the BroadcastChannel have different life cycles or run independently of each other, you can use the BroadcastChannel. The BroadcastChannel cannot sense the life cycle and must be closed manually after being used.
A channel can pass a single value between single coroutines.
fun main(a) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1.. 5) channel.send(x * x)
channel.close()
}
repeat(5) { println(channel.receive()) }
println("Done!")}Copy the code
The results are as follows:
1 4 9 16 25 Done!
Another way to implement producer consumers
fun main(a) = runBlocking<Unit> {
val mChannel = CoroutineScope(Dispatchers.IO).produce<String> {
for(i in 1.3.){
send(" data $i")
println("send $i")
}
close()
}
for(a in mChannel){
println("receive $a")}}Copy the code
Channel more details to follow.