preface

In the last article we looked at the concept and elements of coroutines in the Kotlin language, but in practice we don’t use continuations directly to create and start a coroutine because Kotlin has framework support, which makes development much more efficient.

Overview of coroutine framework

Language level support VS framework level support

Language-level support: Kotlin library apis and semantic support for coroutines, including coroutine contexts, interceptors, and suspended functions.

Framework-level support: Provides more convenient support for upper-level business development, including Job, scheduler, scope, Channel, Flow, and Select features.

Coroutine framework: Kotlinx.coroutines

  • The official coroutine framework, based on the standard library implementation of the feature package
  • Address: github.com/Kotlin/kotl…

Introduction of coroutine framework

/ / coroutines based library implementation "org. Jetbrains. Kotlinx: kotlinx coroutines -- core: 1.6.0" / / coroutines Android library, Provide the Android UI scheduling implementation "org. Jetbrains. Kotlinx: kotlinx coroutines - Android: 1.6.0." "Copy the code

Startup mode of the Kotlin coroutine

The coroutine framework Kotlinx.coroutines creates and launches coroutines using the GlobalScope.launch method:

GlobalScope.launch(start = CoroutineStart.DEFAULT){

}
Copy the code

Start = CoroutineStart.DEFAULT represents the startup mode of the coroutine, and the Kotlin coroutine can be divided into the following four startup modes:

  • DEFAULT mode: The coroutine body will be scheduled immediately after the coroutine is created. If the coroutine body is cancelled before the scheduling, the coroutine body will be cancelled directly. (Note here that after the current thread initiates a coroutine through lunch, it will return immediately and the current thread continues to execute its following logic, that is, the coroutine starts and the thread that initiated the coroutine also continues.)

  • ATOMIC: Scheduling starts immediately and cannot be canceled until the first hang point

  • LAZY: Scheduling starts only when it is needed (start/join/await), similar to using the createCoroutine method that just creates a coroutine and then starts where it needs to be called.

  • UNDISPATCHED: Executes the coroutine body in the current thread immediately until the first hang start is reached (subsequent processes depend on the scheduler). This means that the schedule of the current thread is stuck and does not return to the current thread until the first hang start

A scheduler for the Kotlin coroutine

The coroutine scheduler is essentially a wrapped coroutine interceptor that runs a coroutine on a specified thread, dispatchers it to a thread pool, or lets it run indefinitely. There are four main types of coroutine schedulers (functional resolution for the Java VM platform) :

  • Default: Specifies that the coroutine runs on a non-main thread. It is generally used for CPU intensive computational tasks, such as array sorting and data parsing
  • Main: Specifies that the coroutine runs on the Main thread or runs UI threads in Android. Typically used for UI drawing or lightweight tasks
  • Unconfined: Unassigned thread, executed directly
  • IO: specifies that the coroutine runs on a non-main thread. It is generally used for network I/O tasks or local file data reading, such as network requests and file reads and writes

Channel, Flow, and Select features in the coroutine framework

  • Channel: Hot data flow, a concurrent secure communication mechanism. Hot data flow means that data is returned without being actively triggered
  • Flow: a responsive API for the coroutine, in which cold data Flow means that data will come back when you need it to be triggered. Similar to RxJava, data will come back only when you subscribe.
  • Select: Can wait for multiple pending events

The basic usage of coroutine framework

The main ways to create coroutines commonly used by coroutine framework are:

  • CoroutineScope.launch()
  • CoroutineScope.async()

CoroutineScope is actually an interface that uses a singleton GlobalScope that implements CoroutineScope to call launch() or async() methods to create a CoroutineScope, and the code block within the scope is the coroutine.

GlobalScope.launch

Globalscope.launch {println(" coroutine currentThread :${thread.currentthread ().name}")Copy the code

When you do not specify what thread the coroutine is running on, it uses the Default scheduler: Default, interceptor is DefaultDispatcher, and prints the thread name: DefaultDispatcher Defaultdispatcher-worker-1, this is running on the main thread, android development is not allowed to do UI update operations. Of course, we can also switch threads in coroutines:

Globalscope.launch {println(" coroutine currentThread :${thread.currentthread ().name}") withContext(dispatchers.main){ Switch to the main thread here}}Copy the code

GlobalScope.launch(Dispatchers.Main)

Using dispatchers. Main you can specify that the initiated coroutine runs on the Main thread:

Globalscope.launch (dispatchers.main){log.e (" coroutine currentThread :", thread.currentthread ().name) // in Android development, UI can be updated here}Copy the code
@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
Copy the code

GlobalScope is a singleton, and the source code is shown above. We can see that GlobalScope is no longer associated with other objects and components, so if we don’t handle the coroutines created by GlobalScope ourselves in Android applications, These coroutines will only be destroyed when the app process is destroyed, which is obviously not very secure. The official recommendation here is to use two extension libraries for Kotlin coroutines on KTX,lifecycle extension for lifecycleScope and viewModel extension for viewModelScope.

LifecycleScope and viewModelScope

Implementation "androidx lifecycle: lifecycle - runtime - KTX: 2.3.1" implementation 'androidx. Lifecycle: lifecycle - viewmodel - KTX: 2.3.1'Copy the code

The coroutine scope created by lifecycleScope has Lifecycle awareness and is destroyed by binding with Lifecycle to follow Lifecycle destruction. When an Activity/Fragment uses this coroutine scope, the interface destroys the coroutine. The problem of coroutine leakage is avoided. ViewModelScope has similar properties, and the coroutine created by it is automatically cancelled when the ViewModel is destroyed, avoiding memory problems caused by coroutine leaks.

Execution of coroutines

private fun testLifecycleScope(){ Log.d(TAG,"testLifecycleScope start") lifecycleScope.launch { delay(2000) Textview.text = "LifecycleScope "log.d (TAG, thread.currentThread ().name) log.e (TAG," coroutine end ")} The d (TAG, "testLifecycleScope end")} the execution result: com. Qisan. Kotlinstu. MainActivity: testLifecycleScope start com.qisan.kotlinstu.MainActivity: testLifecycleScope end com.qisan.kotlinstu.MainActivity: The main com. Qisan. Kotlinstu. MainActivity: end of coroutinesCopy the code

LifecycleScope. Launch is asynchronous, but it can do UI updates, which is completely different from direct thread operations.

CoroutineScope.async()

The async method is used to retrieve the return value and run the suspended function concurrently.

private suspend fun getContent1():String{ delay(1000) return "Kotlin" } private suspend fun getContent2():String{ Delay (1000) return "coroutine"}Copy the code

Let’s implement the above two suspend functions and then execute it in the scope of lifecycleScope. Launch to see how it runs and takes:

private fun testLifecycleScope(){ lifecycleScope.launch { val startTime = System.currentTimeMillis() val content_1 = GetContent1 () val content2 = getContent2() log. d(TAG,"$content_1 $content_2 ") ${system.currentTimemillis () -startTime}")}} LifecycleScope: Kotlin coroutine, program time: 2007Copy the code

The getContent1() and getContent2() methods are executed sequentially, as we can see from the elapsed time of the program. Now let’s switch the order of getContent1() and getContent2() and still want to print the Kotlin coroutine, so we need to use async:

private fun testLifecycleScope(){ lifecycleScope.launch { val startTime = System.currentTimeMillis() val content_2 = lifecycleScope.async { getContent2() } val content_1 = lifecycleScope.async { getContent1() } Log.d(TAG,"${content_1.await()} ${content_2.await()}, ${system.currentTimemillis () - startTime}")}} LifecycleScope: Kotlin coroutine, program time: 1002Copy the code

Here we can see that the execution time of the coroutine body is only 1002 milliseconds. It is clear that the getContent1() and getContent2() methods are executed simultaneously. Note here that async is used in conjunction with the await() suspend function. LifecycleScope. Async {} returns the Deferred interface, which inherits from the Job interface:

Deferred

public interface Deferred<out T> : Job {
    public suspend fun await(): T

    public val onAwait: SelectClause1<T>

    @ExperimentalCoroutinesApi
    public fun getCompleted(): T

    @ExperimentalCoroutinesApi
    public fun getCompletionExceptionOrNull(): Throwable?
}
Copy the code

Because Deferred inherits the Job interface, job-related operations are also available on Deferred.

Cancellation of coroutines

Creating coroutine scopes using lifecycleScope and viewModelScope is bound to the caller’s lifecycle and generally does not require the developer to fetch the coroutine manually. MainScope() is a top-level function, that is, it has no receiver.

@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
Copy the code

The ContextScope(container container is container container for handling dispatchers.main) is the container container for handling dispatchers.main. Because it works on the main thread, it is often used as an Activity/Fragment in Android development. When an Activity is destroyed, the cancel method is called to retrieve the coroutine. Here we use MainScope() to create coroutine scopes and use scope to cancel all coroutines:

Val mainScope = mainScope () mainscope.launch {log.d (TAG," first coroutine ")} val mainScope () mainscope.launch {log.d (TAG," second coroutine ")} Handler.postdelayed ({mainscope.cancel ()},500) mainScope: first coroutineCopy the code

The first coroutine had no delay, so it was executed, and the second coroutine took 1000ms to print, by which time it had been cancelled. So mainscope.cancel () cancelling a coroutine scope cancels all child coroutines in that scope. Note here that no new coroutines can be created in the scope of a canceled coroutine. Cancel (); cancel();

public fun CoroutineScope.cancel(cause: CancellationException? = null) { val job = coroutineContext[Job] ? : error("Scope cannot be cancelled because it does not have a job: $this") job.cancel(cause) }Copy the code

You can see that cancelling the coroutine is handled when the cancel() method is called by throwing an exception class: CancellationException. Of course, our call above is not passed, so it creates a default CancellationException instance.

What if we want to cancel individually? We can get an instance of mainScope.launch{}, Job, and cancel the current coroutine by calling cancel() with Job. Let’s look at the launch() implementation first:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

Launch () returns a Job, and that’s fine:

Val job2 = mainscope.launch {log.d (TAG," first coroutine ")} val job2 = mainscope.launch {delay(1000) Log.d(TAG," second coroutine ")} job1.cancel() Print result: MainScope: second coroutineCopy the code

The printed result shows that the first coroutine is cancelled and the second is still executed, so when a coroutine is cancelled it does not affect its peers.

Mainscope. launch is called in the Activity, and its Default dispatchers. Main). What happens if we change the dispatchers. Default?

Val mainScope = mainScope () val job1 = mainscope.launch (dispatchers.default) {log.d (TAG," first coroutine ")} val job2 = Mainscope.launch {delay(1000) log.d (TAG," second coroutine ")} job1.cancel(Copy the code

The coroutine is still printed after job1 is cancelled. This is because dispatchers. Default will wait for coroutine tasks to finish before canceling. When we call cancel(), the coroutine enters the cancelling state, at which point the coroutine task will continue. So how can coroutines be cancelled? We can check if the coroutine has been canceled while working with the logic inside the coroutine body.

ensureActive()Methods Cooperative extraction of covariance

public fun Job.ensureActive(): Unit { if (! isActive) throw getCancellationException() }Copy the code

The ensureActive() method is very handy. It throws an exception when the Job is inactive and continues if it is.

val mainScope = MainScope() val job1 = mainScope.launch(Dispatchers.Default) { delay(50) ensureActive() Log.d(TAG," first coroutine ")} val job2 = mainscope.launch {delay(1000) log.d (TAG," second coroutine ")} job1.cancel(Copy the code

At this point we can cancel job1. Of course, we can also use isActive directly in the coroutine scope to determine whether the coroutine has been canceled.

yield()Methods Cooperative extraction of covariance

The yield() method can be used to check if the Job is complete, and if so, to exit the coroutine by throwing a CancellationException. Take a look at the code:

Val job1 = mainscope.launch (dispatchers.default) {delay(50) yield() log.d (TAG," first coroutine ")} val job2 = mainscope.launch { Delay (1000) log.d (TAG," second coroutine ")} job1.cancel() Prints the result MainScope: second coroutineCopy the code

The main function of the yield() method is not just to check that the Job is complete. It also suspends the current task, freeing up the thread for other tasks to execute.

join()methods

public suspend fun join()
Copy the code

Strictly speaking, join() is a top-level function and is a suspend function, so it can only be called in the scope of the coroutine. The join() function suspends the Coroutine until it has finished executing and then executes the other logical code in sequence. The join function usually appears on another Coroutine.

Launch {val job2 = mainscope.launch {delay(1000) log.d (TAG," second coroutine ")} Job2.join () val job1 = mainscope.launch (dispatchers.default) {log.d (TAG," first coroutine ")}} The first coroutineCopy the code

Note that the pending job that calls join() is not complete until all of its children are complete. In addition, when job.join() is called, join() will throw a CancellationException if the Coroutine of the job has been cancelled or completed.

Third, the Channel

The concept of the Channel

  • Non-blocking communication infrastructure that completes messaging between coroutines
  • Communication channels are similar to BlockingQueue + suspend functions.E take()andput(E e).E take()If BlockingQueue is empty, block until new data is added to the BlockingQueue.put(E e)Add e to BlockingQueue. If the BlockQueue runs out of space, the thread calling this method will block until there is space in the BlockingQueue.

The classification of the Channel

  • RENDEZVOUS: Send will be suspended until receive arrives
  • UNLIMITED: Indicates the UNLIMITED capacity. After sending is called, messages are stored in a Channel and returned directly
  • CONFLATED: when a Channel contains only the last message, it receives only the value of the last send.
  • BUFFERED: The default capacity. The default size can be set by program parameters. The default value is 64
  • FIXED: indicates the FIXED capacity. If the cache size is exceeded, send will be suspended

The use of the Channel

RENDEZVOUS way

Directly on the code:

suspend fun testChannel(){ val channel = Channel<Int>(Channel.RENDEZVOUS) val jobProducer = GlobalScope.launch { for(i in 1.. Println ("sending $I ") channel.send(I) println("sending $I ") channel.close()} val jobConsumer = GlobalScope.launch { while (! Channel. IsClosedForReceive) {println (" identifiers ") val value = channel. ReceiveCatching () getOrNull () / / if there is no channel in the send, Receive suspends println("received $value")}} jobproducer.join () jobconsumer.join ()}  sending 1 receiving sent 1 sending 2 received 1 receiving received 2 receiving sent 2 received nullCopy the code

The order of send and receive appears to be inconsistent because they are running on two threads, but it is guaranteed that send is executed only after receive receives a value.

UNLIMITED ways

UNLIMITED: Send messages to a Channel without receiving messages. If you send a message to a Channel without receiving messages, send messages to a Channel without receiving messages. Similar to the RemoteView refresh action in Android development, the other uses the same basic RENDEZVOUS method as above, which will not be used here.

CONFLATED way

CONFLATED means that the last message is kept in the Channel. If we change the Channel type to CONFLATED, jobConsumer will only accept 2:

val channel = Channel<Int>(Channel.CONFLATED) val jobProducer = GlobalScope.launch { for(i in 1.. 2){ println("sending $i") channel.send(i) println("sent $i") } channel.close() } val jobConsumer = GlobalScope.launch { while (! channel.isClosedForReceive){ println("receiving") val value = channel.receiveCatching().getOrNull() println("received $value")}} jobproducer.join () jobconsumer.join () Sending 1 sent 1 sending 2 sent 2 receiving 2Copy the code

Send is sent, but the received value is only 2.

BUFFERED and FIXED

Both methods specify the size of the Channel and how many messages it can hold, but other features are similar to RENDEZVOUS.

The closing of the Channel

  • callcloseShut downChannel
  • Call after closingsendThrow exceptions,isClosedForSendreturntrue
  • Call after closingreceiveCan receive cached data
  • After the cached data is consumedreceiveThrow exceptions,isClosedForReceivereturntrue

The iteration of the Channel

Since a Channel is BlockingQueue, but it supports suspension functions, a collection class can certainly iterate over a for loop. JobConsumer above can also use iteration to receive messages in two ways:

For loop iteration

for (i in channel){
    println("received $i")
}
Copy the code

Use Iterator to iterate

val iterator = channel.iterator()
while (iterator.hasNext()){
    val e = iterator.next()
    println("receive $e")
}
Copy the code

Here hasNext is equivalent to replacing! Channel. IsClosedForReceive judgment:

  • When cached data is returned,hasNextreturntrue
  • Suspends when not closed and the cache is empty
  • Returns when closed normally and the cache is emptyfalse

Coroutine Builder for Channel

Produce and CoroutineScope. Actor methods are provided in coroutine framework to construct producers and consumers of sending messages.

  • CoroutineScope. Produce:Start a production consumer coroutine and returnReceiveChannelOther coroutines can use thisChannelIt’s time to receive the data
  • CoroutineScope. Actor:Start a consumer coroutine and return oneSendChannelOther producer coroutines can be used to send messages
  • After the coroutine started by the above two Builder methods is finished, the corresponding Channel is automatically closed

CoroutineScope. Produce use:

Suspend fun producer(){// Start a producer coroutine, send messages, Channel val receiveChannel = GlobalScope. Produce <Int>(Capacity = channel.unlimited) {for (I in 1.. 2){println("sending $I ") send(I) println("sending $I ")}} val jobConsumer = globalscope.launch {for (I in ReceiveChannel){println("received $I ")}} jobconsumer.join ()}  sending 1 sent 1 sending 2 sent 2 received 1 received 2Copy the code

CoroutineScope. Use the actor:

Suspend fun consumer(){// Start a consumer coroutine, receive, Channel val sendChannel = GlobalScope.actor<Int>(capacity = channel.unlimited){for (I in this){ println("received $i") } } val jobProducer = GlobalScope.launch { for (i in 1.. 1){println("sending $I ") sendchannel. send(I) println("sending $I ")}} jobproducer.join ()}  sending 1 sent 1 sending 2 sent 2 received 1 received 2Copy the code

BrodcastChannel

  • ChannelCan only be consumed by one consumer
  • BrodcastChannelIs distributed to all subscribers
  • BrodcastChannelDoes not supportRENDEZVOUSWay, becauseBrodcastChannelIt’s one to many

Create a BrodcastChannel:

Val channel = channel <Int>(channel.buffered) val broadcastChannel = channel.broadcast() BroadcastChannel = BroadcastChannel<Int>(channel.buffered broadcastChannel = GlobalScope.broadcast<Int> { }Copy the code

Broadcast creates a BrodcastChannel via coroutinescope.broadcast, which is easy for subscribers to accept.

val broadcastChannel = GlobalScope.broadcast<Int> { for (i in 1.. 2){ send(i) } } val jobConsumer_1 = GlobalScope.launch { val receiveChannel_1 = broadcastChannel.openSubscription() for (i in receiveChannel_1){ println("receiveChannel_1 value:$i") } } val jobConsumer_2 = GlobalScope.launch { val receiveChannel_2 = broadcastChannel.openSubscription() for (i in receiveChannel_2){ println("receiveChannel_2 value:$i") }} jobconsumer_1.join () jobconsumer_2.join ()  receiveChannel_1 value:1 receiveChannel_2 value:1 receiveChannel_1 value:2 receiveChannel_2 value:2Copy the code

You can see that each consumer subscribes and gets a message sent by the producer.

Select-multiplexing

The concept of the Select

  • SelectIt is a concept of IO multiplexing, and multiplexing is the simultaneous transmission of multiple signals in one channel
  • coroutinesSelectMultiplexing for suspended functions

Reuse of multiple channels

In the case of multi-channel multiplexing, asynchronous tasks are executed in multiple channels, and the one who returns to send messages first will be executed. In this case, we can only read sequentially, one by one, and so on, which is very inefficient. The advantage of Select is that it can preferentially read and process the fastest incoming Channel messages. The following is a simulation of multiple channel multiplexing:

val channelList = listOf(Channel<Int>(),Channel<Int>(),Channel<Int>()) GlobalScope.launch { delay(200) channelList[0].send(200) } GlobalScope.launch { delay(100) channelList[1].send(100) } GlobalScope.launch { delay(400) channelList[2].send(400) } val result = select<Int> { channelList.forEach{ channel -> channel.onReceive{ it } } } Println (result) Prints the result: 100Copy the code

We can see that in order, the Channel of Send (100) is the second Channel, but it is returned first, so Select will receive the fastest Channel message.

Multiple reuse of await

Multiple reuse of await is suitable for network requests. We can simulate the situation of requesting data from the cache and network respectively. Whoever returns the data first will be used.

/ / simulation cache data, the cache must be quick, time delay 100 ms fun CoroutineScope. GetDataFromCache () = async {delay (100) "getDataFromCache} / / simulated network with data, Delay 500 s return fun CoroutineScope. GetDataFromNet () = async {delay (500) "getDataFromNet} / / suspend execution code fun main () { Globalscope.launch {val dataFromCache = getDataFromCache() val dataFromNet = getDataFromNet() // Use select to wait for Val result DataFromCache. OnAwait {it} datafromnet. onAwait{it}} println(result)}.join()}  getDataFromCacheCopy the code

If multiple channels arrive at the same time, the select Channel will take the first one in order. For this reason, there is another Api:selectUnbiased, which means there is no bias. If multiple messages arrive at the same time, selectUnbiased will randomly select one.

Fourth, the Flow

The concept of Flow

A Flow is an asynchronous data Flow that emits and completes values sequentially. It is a reactive API of Kotlin coroutines, combined with reactive programming.

The interior of a Flow is executed sequentially, in much the same way as the sequence generator sequence. The biggest difference between Flow and Sequence is that Flow does not block the main thread, because Flow is completely constructed by coroutine. By using the suspend and resume mechanism of coroutine, the execution of the producer Flow can be synchronized with that of the user COLLECT, while Sequence blocks the main thread.

How to create a Flow

  • flow builder: Direct callflow<T>{}Creates a cold data stream in the given suspended function
  • FlowOf (vararg elements: T) :Quick creation using mutable arraysflow
  • AsFlow () :Convert other data to normalflow, e.g.ListtoFlowThe transformation of the
  • ChannelFlow () :Buffering channel support, thread safety, allows for differentCorotineContextSend the event

Create a Flow

val mFlow = flow<Int> { (1.. 2).forEach{ emit(it) delay(200) } }Copy the code

The EMIT function provides elements to the consumer, and other suspended functions can be called from within the Flow block.

Flow can also specify which scheduler it runs with:

//intFlow constructs mflow.flowon (dispatchers.io) on the IO thread.Copy the code

The Flow of consumption

Consume mFlow simply by calling collect, which is also a suspend function:

Mflow.flowon (dispatchers.io). Collect {println(it)} Print: 1 2Copy the code

To compare chain-order calls, we can write as follows:

val mFlow = flow<Int> { (1.. 2).forEach{ emit(it) delay(200) } }.flowOn(Dispatchers.IO).collect { println(it) }Copy the code

Compare that to RxJava thread switching

While RxJava uses observeOn and subscribeOn to switch threads, Flow is relatively simple and just uses flowOn. FlowOn and subscribeOn correspond, while the scheduler of the coroutine where Collect is located corresponds to the scheduler specified by observeOn.

Cold stream

In fact, cold data Flow is similar to the lazy design pattern, that is, after Flow is created, no consumption means no production, and multiple consumption means multiple production, which corresponds to production and consumption. This corresponds exactly to a Channel: the sender of a Channel does not depend on the receiver.

Exception handling

The catch function can be called directly, or the traditional try function can be used. To catch an exception:

val mFlow = flow<Int> { (1.. 2).forEach{ emit(it) throw RuntimeException() } }.catch { t -> println("RuntimeException: $t ")}. Collect {println (it)} printing results: 1 RuntimeException: Java lang. RuntimeExceptionCopy the code

The operation when the data flow is complete

Look at the following code:

flow<Int> { (1.. 2).forEach{ emit(it) throw RuntimeException() } }.catch { t -> println("RuntimeException: $t") }.onCompletion { t -> println("finally") }.collect { println(it) }Copy the code

OnCompletion is similar to a try… catch… Finally in finally, which will eventually be called when the data flow completes, regardless of whether there was an exception before, and the argument t is the exception that was not caught earlier.

Terminal operator

Collect is the most basic end operator. In addition, other common end operators can be divided into two categories:

  • Collection type conversion operations, includingtoList,toSetEtc.
  • Aggregate operations, including willFlowProtocol to single valuereduce,fold, and operations to get a single element are includedsingle,singleOrNull,first 等

transform

Transform operator that can emit any value by calling emit any number of times:

(1.. 3).asflow ().transform {emit(it) delay(200) // Emit ("value $it")}. Collect {println(it)}  1 value 1 2 value 2 3 value 3Copy the code

take

The take operator takes only the values emitted from the previous emit specified:

(1.. 3).asflow ().take(2).collect {println(it)Copy the code

reduce

The reduce operator evaluates a collection by performing operations on the current two elements and then logically following the next element, knowing the last element and getting the final value.

val sum = (1.. 5).asflow ().reduce {a, b -> a + b} println(sumCopy the code

So the reddest result is 15, which is 1 plus 5.

fold

The fold operator is similar to the fold function of the Kotlin collection, which also requires an initial value.

val sum = (1.. 5).asflow ().fold(0) {a, b -> a + b} println(sumCopy the code

zip

The zip operator can merge two flows.

val flowA = (1.. 3).asFlow() val flowB = flowOf("A", "B", "C ") flowa.zip (flowB){a,b-> "$a -> $b"}. Collect {println(it)} 1 -> a 2 -> b 3 -> cCopy the code

combine

The Combine operator is also a merge operation, but differs from ZIP in that the latest value emitted by flowA is only merged with the latest value emitted by flowB, not sequentially.

val flowA = (1.. 3).asFlow().onEach { delay(100) } val flowB = flowOf("A", "B", "C").onEach { delay(200) } flowA.combine(flowB) { a, B -> "$a -> $b"}. Collect {println(it)} 1 -> A 2 -> A 3 -> B 3 -> CCopy the code

flattenMerge

The flattenMerge operator executes a flow singly.

val flowA = (1.. 1).asflow () val flowB = flowOf("A", "B", "C") flowOf(flowA, flowB).collect {println(it)}  A 1 2 3 B CCopy the code

If we want to run the math sequentially, we simply switch to a flattenConcat method.

This is the end of the operators, and I won’t go into all the other operators.

Separation of flow consumption and triggering

In addition to consuming elements of the Flow through Collect, onEach can also be used to consume elements of the Flow. The advantage of consuming the Flow element with onEach is that it does not need to be grouped with the end operator. The collect function can be called anywhere else, so that the consumption and firing of the Flow are separated

suspend fun main() { getFlow().collect() } fun getFlow() = flow<Int> { (1.. 3).forEach { emit(it) delay(200) } }.onEach { println(it) }Copy the code

The cancellation of the Flow

Flow does not provide a way to cancel operations. Can’t we cancel Flow? Of course not, because the consumption of Flow depends on terminal operators such as COLLECT, and they must be called in coroutines, so the cancellation of Flow mainly depends on the state of the coroutine where the terminal operator resides.

val job = GlobalScope.launch { val mFlow = flow { (1.. 3). ForEach {delay(500) emit(it)}} mflow. collect {println(it)}} delay(1500) job.cancelandJoin (Copy the code

According to the result of the run, to cancel the Flow only need to cancel its coroutine.

The channelFlow function creates a Flow

The hannelFlow function’s biggest contrast to other ways of creating flows is that it toggles the scheduler when generating elements:

channelFlow {
  send(1)
  withContext(Dispatchers.IO) {
    send(2)
  }
}
Copy the code

Flow back pressure

The back pressure problem is a pain point in responsive programming, which occurs when the production rate of the producer is higher than the processing rate of the consumer. To solve the back pressure problem, we can consider increasing the cache to ensure that the data is not lost.

Add cache to Flow

flow<Int> { (1.. 100).forEach { emit(it) } }.buffer()Copy the code

Simply caching flows is a non-solution, as the potential for data backlogs still exists. The fundamental problem is the mismatch between production and consumption rates. In addition to directly optimizing the performance of consumers, some trade-offs can also be adopted. The first is conflate. Consistent with a Channel’s Conflate mode, new data overwrites old data. Look at the following code:

flow<Int> { (1.. 100).foreach {emit(it)}}.conflate().collect {delay(100) println("collect value $it")}  collect value 1 collect value 100Copy the code

The sending elements are delayed, so they are sent quickly, but only two elements are received.

Another way: collectLates

Unlike Conflate, collectLates does not directly overwrite old data with new data. Instead, each data is processed, but the logic for processing the previous data is removed when a new one arrives before the previous one is finished.

Flow <Int> {// it is easy to display the result, only 5 data (1.. 5). ForEach {emit(it)}}. CollectLatest {println("Collecting $it") delay(100) println("$it ")}  Collecting 1 Collecting 2 Collecting 3 Collecting 4 Collecting 5 5 collectedCopy the code

It can be seen here that Collecting output all the results from 1 to 5, but collected only 5, which is because the operation of processing the last data was just suspended by delay when the later data arrived. In addition to collectLatest, mapLatest and flatMapLatest also have this function.

The Flow of transformation

Element transformation of Flow

Use map to transform the elements of Flow in the same way as the map transform of list:

flow<Int> { (1.. 3.) the forEach {emit (it)}}. The map {2} it *. The collect () {print ($it, "")} running results: 2 minus 2,Copy the code

Flow nesting and stitching

flow { (1.. 3).forEach { emit(it) } }.map { flow { (1.. it).forEach { emit(it) } } }Copy the code

What we get is a Flow with data type Flow. There are three flows with 1, 2, and 3 elements sent internally. We then use a flattening concat to combine the several flows:

flow { (1.. 3).forEach { emit(it) } }.map { flow { (1.. ForEach {emit(it)}}.flattena concat ().collect {println(it)} run result: 1 1 2 1 2 3Copy the code

Flow implements multiplexing

Flow implements multiplexing of await

Use select to simulate local and network fetching data:

/ / the data from the cache and simulation fun CoroutineScope. GetDataFromCache () = async {delay (100) "getDataFromCache} / / simulation network data, Delay 500 s return fun CoroutineScope. GetDataFromNet () = async {delay (500) "getDataFromNet}" suspend fun main () { GlobalScope.launch { listOf(::getDataFromCache, ::getDataFromNet) .map { function -> function.call() }.map { deferred -> flow { emit(deferred.await()) } }.merge() .collect {println(it)}}.join()} Result: getDataFromCache getDataFromNetCopy the code

Unlike select, Flow collects both data.

summary

At this point, this article on the basic use of coroutine and part of the advanced learning is over, unconsciously wrote so much. Of course, there are a lot of knowledge points, especially difficult points, of coroutines, which should be further studied and explored. This article is mainly to record the process of learning coroutines, if you read to help, that is really good. Next, I will continue to study Kotlin in depth and apply it in practical projects. I will also share more Kotlin learning experience when I have time!