This is the first day of my participation in the August Challenge. For details, see:August is more challenging

preface

The last article on coroutine concept and principle, coroutine framework basic use, suspend functions and suspend and restore to do a detailed analysis, if you have a certain understanding of coroutine, you can read “Kotlin coroutine practical advanced (a, build the foundation)” we to coroutine overall understanding to do a whole exchange. Due to the space, there are still some important points that have not been covered. Next, we will continue to analyze the important elements and use of Kotlin coroutines. First, let’s review the whole content of the previous article:

  • 1, Coroutine: the concept and principle of Coroutine: what is Coroutine and its role and characteristics, graphic analysis of the working principle of Coroutine.
  • 2. Coroutine Builders: Three ways to create coroutines.
  • 3,CoroutineScopeCoroutine scope, the context in which the coroutine runs, is used both to provide function support and to add restrictions. The seven common scopes (includingLifecycleSupported coroutines) and the scope’s classification and behavior rules.
  • 4,Job & Deferred: handle to coroutine, realize control and management of coroutine,DeferredThere is a return value.
  • 5,CoroutineDispatcher: coroutine scheduler, determines which thread the corresponding coroutine executes on, the four modes of scheduler andwithContextMainly to switch the coroutine context.
  • 6,CoroutineContextCoroutine context, representing the environment in which a coroutine runs, including a coroutine scheduler and representing the coroutine itselfJob, coroutine name, coroutine ID, and use of composite context.
  • CoroutineStart: An enumeration class that defines four startup modes for coroutine builders.
  • Suspend: suspend a function. Suspend: suspend a function. A way to avoid thread blocking and replace thread blocking with a simpler, more controllable operation: coroutine suspend and resume.

This article outline

Coroutine cancellation

In everyday development, we all know that we should avoid unnecessary tasks, control the life cycle of coroutines, and eliminate them when they are not needed.

1. Call the cancel method

The coroutine handles the cancellation by throwing a special CancellationException. This mechanism can be used to handle cancellation of coroutines once the CancellationException exception has been thrown. When calling job.cancel, you can pass in a CancellationException instance to provide the specified error message:

public fun cancel(cause: CancellationException? = null)
Copy the code

This parameter is empty, if you don’t pass parameters will use the default defaultCancellationException () as a parameter.

A subcoroutine notifys its parent of the cancellation by throwing an exception. The parent coroutine determines whether to handle the exception by passing in the cancellation reason. If a subcoroutine is cancelled due to a CancellationException, no additional operations are required for the parent.

We can cancel all created subcoroutines by directly fetching the entire scope involved in the cancellation of the coroutine launch:

// Create the scope
val scope = CoroutineScope(Dispatchers.Main)
// Start a coroutine
val job = scope.launch {
    //TODO
}

// The scope is cancelled
scope.cancel()
Copy the code

Canceling scope cancels all of its subcoroutines. Note: Coroutines cannot be created from canceled scopes. You can useThe try... catchCapture theCancellationException.

If one of the coroutines is cancelled simply to cancel a task in progress, then calling the jobless.cancel () method of the coroutine ensures that only the particular coroutine associated with the job is cancelled and that no sibling coroutines are affected.

// Create the scope
val scope = CoroutineScope(Dispatchers.Main)

// The coroutine job1 will be cancelled, and the other job2 will not be affected
val job1 = scope.launch {
    //TODO
}
val job2 = scope.launch {
    //TODO
}

// Cancel the single coroutine
job1.cancel()
Copy the code

Cancelled subcoroutines do not affect the remaining sibling coroutines.

If you’re using the AndroidX KTX library, you don’t need to create your own scopes in most cases, so you don’t have to cancel them. ViewModelScope and lifecycleScope are both CoroutineScope objects, and both are cancelled at the appropriate point in time. When the ViewModel is cleared, coroutines started in its scope are also cancelled. The lifecycleScope is bound to the current UI component lifecycle, and the coroutine scope is cancelled when the interface is destroyed, without coroutine leakage.

2. Check the status of coroutines

Just because the cancel method is called does not mean that the task being handled by the coroutine will stop. In the use of coroutines to deal with some relatively heavy work, such as reading multiple files, will not immediately stop this task.

For example, we use a coroutine to print data every 500 milliseconds, let the coroutine run for 1.2 seconds, and then cancel it:

    fun jobTest(a) = runBlocking {
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0

            while (i < 5) {// Print the first five messages
                if (System.currentTimeMillis() >= nextPrintTime) {// Print messages twice per second
                    print("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500
                }
            }
        }

        delay(1200)/ / delay 1.2 s
        print("After waiting 1.2 seconds.")

        job.cancel()
        print("Coroutine cancelled")}Copy the code

The printed data is as follows:

When the job.cancel method is called, our coroutine shifts to the state of cancelling. But then we find that the third and fourth pieces of data are printed on the command line. The coroutine changes its state to cancelled (cancelled) when its task is finished.

Take a fresh look at the Job lifecycle:

A Job can have a list of states: New, Active, Completing, Completed, Cancelling, and Cancelled. While we cannot access these states directly, we can access the Job properties isActive, isCancelled, and isCompleted. The life cycle of a Job is as follows (from the official website) :

If the coroutine isActive, an error in running the coroutine or a call to job.cancel() will make the current task Cancelling (isActive = false, isCancelled = true). When all subcoroutines have been completed, the coroutine enters the Cancelled state where isCompleted = true.

The task handled by the coroutine does not stop just when the cancel method is called; instead, we need to modify the code to periodically check whether the coroutine is active. Add a check on the coroutine state before processing the task:

    while (i < 5 && isActive) // Continue execution when the job is active
Copy the code

Then our task will only be executed if the coroutine is active. Items 3 and 4 will not be printed after the coroutine is cancelled.

3.join() & await()The cancellation of the

There are two ways to wait for coroutine processing results: the job.join () method from launch, and the Deferred.await() method returned by async.

Job.join() suspends the coroutine until task processing is complete.

val job = launch {
    //TODO
}

job.cancel()// Take the elimination coroutine
job.join()// Suspend and call the coroutine until the job completes
// Job.cancelandJoin ()// Suspend and call the coroutine until the cancelled job completes
Copy the code

When used with job.cancel(), this is done as follows:

  • If thejob.cancel()And then calljob.join(), the coroutine will remain suspended until the task is processed;
  • injob.join()After the calljob.cancel()It doesn’t matter becausejobIt’s done.

If you want to get coroutine processing results, you should use Deferred. When the coroutine completes, the result is returned by deferred.await (). Deferred continues from Job, which can also be cancelled.

Val deferred = async {delay(1000) print("asyncTest")} deferred.cancel() Deferred.await ()// Throws a JobCancellationExceptionCopy the code
  • delay(): delays the coroutine for a given time without blocking the thread, and resumes the coroutine after a specified time. You can think of it as triggering a delayed task that tells the coroutine scheduling system how long to wait to execute the code.

Calling await() on a cancelled Deferred throws a JobCancellationException. Because await() is responsible for suspending the coroutine until the result of its processing is produced, if the coroutine is cancelled then the coroutine will not continue the computation and will not produce a result. Therefore, calling await() after the coroutine has cancelled throws a JobCancellationException: because the Job has been cancelled.

In addition, nothing happens if you call deferred.cancel() after deferred.await() because the coroutine has finished processing.

4. Finally, release resources

If you want to perform a specific action after the coroutine is canceled, such as shutting down resources that might be in use, logging the cancellation, or performing some remaining cleanup code. So how do you do that?

A CancellationException is thrown when the coroutine is cancelled. We can place the pending task in the try… The catch… In the finally block, the exception thrown after cancellation is caught in the catch, and the cleanup task that needs to be done is performed in the finally block.

val job = GlobalScope.launch {
    try {
        //TODO
        delay(500L)}catch (e: CancellationException) {
        print("Coroutine cancellations throw exceptions:$e")}finally {
        print("Coroutine cleanup")
    }
}

job.cancel()// Take the elimination coroutine
Copy the code

The printed data is as follows:

[DefaultDispatcher-worker-1] coroutines cancel throws an exception: kotlinx coroutines. JobCancellationException: StandaloneCoroutine was cancelled. job=StandaloneCoroutine{Cancelling}@bb81f53
[DefaultDispatcher-worker-1Coroutine cleaningCopy the code

However, if the cleanup that needs to be performed also needs to be suspended, then the above will not work, because once the coroutine is in the cancelled state, it can no longer move to the suspend state.

5.NonCancellable

If you need to call the suspended function to clean up after a coroutine has been canceled, you can use the NonCancellable singleton for the withContext function to create a coroutine that cannot be canceled to execute in scope. This suspends the running code and keeps the coroutine in the cancelled state until the task is processed.

val job = launch {
    try {
        //TODO
    } catch (e: CancellationException) {
        print("Coroutine cancellations throw exceptions")}finally {
        withContext(NonCancellable) {
            delay(100)// Or some other suspended function
            print("Coroutine cleanup")
        }
    }
}

delay(500L)
job.cancel()// Take the elimination coroutine
Copy the code

However, this approach needs to be used with caution. It is risky to do so because you may lose control over the execution of the coroutine.

6.withTimeout

WithTimeout function is used to specify the timeout coroutines running, if the timeout may be thrown TimeoutCancellationException, to end run coroutines.

withTimeout(1300) {/ / 1300 milliseconds after the timeout thrown TimeoutCancellationException anomalies
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500)}}Copy the code

The print result is as follows:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
Copy the code

If you don’t want to throw an exception, you can use withTimeoutOrNull() to return null on timeout instead of an exception.

Iv. Exception handling

1. Try… catch

Coroutines handle exceptions using the usual Kotlin syntax: try… Catch or built-in tool methods like runCatching(which still uses try internally… All uncaught exceptions must be thrown. However, different coroutine Builders handle exceptions in different ways.

With launch, exceptions are thrown as soon as they happen, so you can wrap the code that throws the exception into the try… The catch:

val scope = CoroutineScope(Dispatchers.Main)
scope.launch {
    try {
        print("Simulate throwing an array out of bounds exception.")
        throw IndexOutOfBoundsException() // Launch throws an exception
    } catch (e: Exception) {
        // Handle the exception
        print("This handles thrown exceptions.")}}Copy the code

The printed data is as follows:

[main] simulates throwing an out-of-bounds array exception. [main] Handles thrown exceptions hereCopy the code

Async is not automatically thrown when it is used as a root coroutine (a direct subcoroutine of a CoroutineScope instance or supervisorScope). It is thrown when it calls the await(). To catch an exception thrown from it, use try… Code for a catch package to call await() :

supervisorScope {
    val deferred = async {
        print("Simulation throws an arithmetic operation exception.")
        throw ArithmeticException()
    }

    try {
        deferred.await()
    } catch (e: Exception) {
        // Handle the exception
        print("This handles thrown exceptions.")}}Copy the code

The printed data is as follows:

[main] simulates throwing an arithmetic operation exception. [main] handles thrown exceptionsCopy the code

Note:asyncExceptions thrown in the coroutineScope builder or in coroutines created by other coroutines are not caught by try/catch!

In contrast, Job exceptions are automatically propagated across the hierarchy. Except when Async is used as the root coroutine, exceptions generated in coroutines created by other coroutines in async will always be propagated, regardless of the Builder of the coroutine. The catch part of the code block is not called, and the exception is propagated and passed to the scope:

scope.launch {
    try {
        val deferred = async {
            print("Simulation throws a null pointer exception.")
            throw NullPointerException()
        }
        deferred.await()
    } catch (e: Exception) {
        // Exceptions thrown by async will not be caught here
        // However, exceptions are propagated and passed to scope
        print("You can't have an exception here, and the exception propagates upward.")}}Copy the code

Since scope’s direct subcoroutine is launch, if an exception is generated in async, it will be thrown immediately. The reason is that async (which contains a Job in its CoroutineContext) automatically propagates an exception to its parent (launch), which causes the exception to be thrown immediately. The printed data is as follows:

2. Exception propagation within the scope

Exceptions to coroutines are distributed and propagated, involving other sibling coroutines as well as parent coroutines.

When a coroutine fails due to an exception, it propagates the exception to its parent, which cancellations the rest of the child coroutines and cancellations its own execution. The exception is then propagated to its parent. When the exception reaches the root of the current hierarchy, all coroutines started in the current coroutine scope are cancelled.

Normally, such exception propagation is reasonable, but in an application dealing with user interactions, when one of the subcoroutines is abnormal, the scope may be canceled, no new coroutines can be opened, and the entire UI component may not respond.

The SupervisorJob is designed to handle this problem, and we’ll see what it is. The conclusion about the propagation of exceptions in scope is given first. When an exception occurs in a coroutine, an exception passing is triggered based on the current scope:

  • CoroutineScope In general, cancellation of a coroutine is propagated through the coroutine hierarchy. If the parent coroutine is cancelled or if the parent coroutine throws an exception, the child coroutine is cancelled. If a subcoroutine is cancelled, neither the parent coroutine nor the sibling coroutine will be affected, but if a subcoroutine throws an exception, the sibling coroutine will be cancelled and the exception will be passed to the parent coroutine, causing the entire coroutine scope to fail.

  • It supervisorScope, and its cancellations are only propagated downward. The failure of one subcoroutine does not affect the other subcoroutines. Internal exceptions are not propagated upward, and they do not affect the parent or sibling coroutines.

3.SupervisorJob

It is similar in format to the routine Job, except that it is only propagated downward when it is cancelled, and the failure of one subcoroutine does not affect the other coroutines.

  • SupervisorJobThe failure or cancellation of one of its subcoroutines does not cause it to fail and does not affect the other subcoroutines.SupervisorJobIt does not cancel itself and its subcoroutines, nor does it propagate an exception and pass it to its parent, it lets the subcoroutine handle the exception itself.
  • supervisorScope: Monitor scope, useSupervisorJobCreate a scope. The failure of one subdomain does not cause the scope to fail, nor does it affect its other subdomains, and you can implement a custom policy to handle the failure of a subdomain. Failure of the scope itself (throwing an exception in [block] or cancel) results in the scope and all of its childrenJobFailed, but will not cancel the parentJob.

Here are two things you need to know to understand this example:

1. Coroutines handle cancellations by throwing a special CancellationException. (Coroutine cancellation mentioned above)

2. Has not been caught exception will be thrown, no matter what kind of Job you are using, if an exception handling not captured, and none CoroutineContext CoroutineExceptionHandler (later), The exception reaches the ExceptionHandler of the default thread. In the JVM, exceptions are printed on the console; In Android, whether an exception occurs in that Dispatcher, the application crashes.

Let’s use an example from suspend main to print data from the console:

suspend fun main(a) {
    try {
        coroutineScope {
            print("1")
            val job1 = launch {// The first subcoroutine
                print("2")
                throw  NullPointerException()// Throw a null pointer exception
            }
            val job2 = launch {// Second subcoroutine
                delay(1000)
                print("3")}try {/ / here try... Catch catch CancellationException
                job2.join()
                println("4")// Wait for the second subcoroutine to complete:
            } catch (e: Exception) {
                print("5. $e")// Catch the cancellation exception for the second coroutine}}}catch (e: Exception) {// Catch the parent coroutine's cancellation exception
        print("6. $e")
    }

    Thread.sleep(3000)// Block the main thread for 3 seconds to keep the JVM alive and wait for execution to complete
}
Copy the code

The above code is a bit complicated, but it’s not hard to understand. We start two sibling subcoroutines in a coroutineScope, throw an uncaught exception in Job1, then joB2 throws a cancel exception, and then the exception in Job1 is passed up to the parent coroutine, catching the NullPointerException passed to the parent in the outermost layer.

So if you change coroutineScope to supervisorScope, and everything else is the same, what is the result?

We find that the exception thrown by job1 does not affect the execution of the parent scope or the other subcoroutines in the scope, Job2. SupervisorJob is not supposed to be the SupervisorJob. It is passed in the container as a coroutine construction parameter.

When should you use the Job or SupervisorJob? If you want to avoid exiting the parent and other lateral coroutines when an error occurs, use the SupervisorJob or supervisorScope. For example, if a network request fails and all other requests are cancelled immediately, choose coroutineScope. The supervisorScope supervisorsupervisorscope is designed to be used when one of the coroutines fails. It is not designed to cancel the rest of the subcoroutines when it fails.

It is recommended that you avoid using the standard library API directly unless you are familiar with the mechanism of coroutines. If exceptions may occur, handle them as well as possible and do not complicate the problem.

4.CoroutineExceptionHandler

CoroutineExceptionHandler exception handler is a kind of coroutines context, need to add it to the coroutines context. You can handle uncaught exceptions. Here for custom logging or exception handling, it is similar to the use of Thread Thread. UncaughtExceptionHandler.

For an exception to be caught, the following two conditions must be met:

  • This exception is thrown by a coroutine that automatically throws exceptions (yeslaunchRather thanasync);
  • CoroutineExceptionHandlerSet in theCoroutineScopeOr in a root coroutine (CoroutineScopeorsupervisorScopeOf the subcoroutine.

Define an exception handler:

val handler = CoroutineExceptionHandler { context, exception ->
    print("Exceptions caught:$exception")}Copy the code

For example, in the following code, exceptions generated by launch are caught by the handler, but async’s are not:

runBlocking {
    val scope = CoroutineScope(Job())
    
    val job = scope.launch(handler) {// The parent coroutine sets the exception handler
        launch {// The subcoroutine throws an exception
            throw NullPointerException()
        }
   
        async {// There is no effect. The user calling await() will crash abnormally
            throw IllegalArgumentException()
        }
     }

    job.join()// Pause the coroutine until the task is complete
}
Copy the code

CoroutineExceptionHandler will only call on expected by the user does not handle the exception, it can capture CoroutineExceptionHandler Settings in the parent coroutines context and launch the exception thrown. Used in async it doesn’t have any effect, because async builder will always capture all exceptions and said it in the result Deferred object, therefore its CoroutineExceptionHandler is invalid. Calling async.await() will still crash the application when an exception occurs inside async and is not caught. The printed data is as follows:

[DefaultDispatcher-worker-1] catch exceptions: Java. Lang. NullPointerExceptionCopy the code

The handler handler is set to an internal coroutine, so it is not supposed to catch the exception, it is designed to handle the subcoroutine exception:

runBlocking {
    val scope = CoroutineScope(Job())
    scope.launch {
        launch(handler) {Subcoroutine Settings are meaningless and do not print data because exceptions are passed upwards and cannot be caught without a handler in the parent coroutine
            throw NullPointerException()// Throw a null pointer exception
        }
    }

    supervisorScope {
        launch(handler) {//SupervisorJob does not allow exceptions to be passed upwards. Instead, it uses the exception handler inside the subcoroutine to handle it
            throw IllegalArgumentException()// Throw an invalid parameter exception}}}Copy the code

A subcoroutine setting an exception handler is invalid, because if a subcoroutine throws an exception, it will still be thrown to the parent coroutine, which cannot be caught without a handler in it, so it makes no sense to catch an exception in a subcoroutine. The exception handler is set directly on the subcoroutine of the supervised job. It is not passed up in the SupervisorJob and is handled by its internal exception handler. The printed data is as follows:

[the main] catch exceptions: Java. Lang. IllegalArgumentExceptionCopy the code

SupervisorJob is used when the exception is not caught. It is designed to prevent the cancellation operation from being propagated when the exception occurs. Otherwise, use Job.

Note:

1. CancellationException is used internally by coroutines. This exception is ignored by all handlers, so exceptions that can be caught by the catch block should only be used as a resource for additional debugging.

2. When multiple subcoroutines of a coroutine fail due to an exception, the general rule is “take the first exception”, so the first exception will be handled. All other exception reads that occur after the first exception bind to the first exception as suppressed exceptions.

Five, the Channel

A Channel is a non-blocking communication infrastructure. It is essentially a queue, and it is concurrency safe. It can be used to connect coroutines and to communicate between different coroutines. Messages can be delivered between two or more coroutines, and multiple scopes can send and receive data through a Channel object. Similar to BlockingQueue+ suspend function, called hot stream.

GlobalScope.launch {
    // 1. Create Channel
    val channel = Channel<Int> ()// 2. Channel sends data
    launch {
        for (i in 1.3.) {
            delay(100)
            channel.send(i)/ / send
        }
        channel.close()// Close the Channel
    }

    // 3. Channel receives data
    launch {
        repeat(3) {
            val receive = channel.receive()/ / receive
            print("Receive$receive")}}}Copy the code

The three steps implement data transfer between coroutines using channels. Send data every 100 milliseconds in one coroutine and receive data in the other. Print as follows:

[the main] received1[the main] received2[the main] received3
Copy the code

1. Create the Channel

There are two ways to create a Channel:

  • Direct use ofChannelObject creation, as above
  • Expanding functionproduce: starts a producer coroutine and returns oneReceiveChannel. The coroutine it starts automatically closes when it finishesChannel.
GlobalScope.launch {
    // 1. Produce creates a Channel
    val channel = produce<Int> {
        for (i in 1.3.) {
            delay(100)
            send(i)// Send data}}// 2. Receive data
    launch {
        for (value in channel) {// The for loop prints the received value (until the channel closes)
            print("Receive$value")}}}Copy the code

The extension function produce directly combines creating a Channel and sending data in one step.

2. Send data

  • channel.send(): Sends data.
  • channel.close(): Closes the Channel. Data is sent.

When we finish sending the data, we can use channel.close () to close the Channel.

3. Receive data

  • channel.receive(): Receives data.

We usually call Channel#receive() to retrieve the data, but this method can only retrieve the data once if we know how many times we get the data:

repeat(3) {// Repeat receiving data three times
    val receive = channel.receive()// Receive data
    print("Receive$receive")}Copy the code

If we don’t know how much data to receive, we use an iterative Channel to receive the data:

for (value in channel) {// The for loop prints the received value (until the channel closes)
    print("Receive$value")}Copy the code

Channel, so to speak, infuses coroutines with soul. Each individual coroutine is no longer a solitary individual, and a Channel allows them to collaborate more easily. However, after Flow comes out, Channel is rarely used. Let’s look at cold data Flow.

Hot and cold data flow:

  • Hot data flow: Data is produced even when there are no observers. It sends data even if you don’t subscribe. For example, if a movie is playing in the theater, you have to go to the theater to see it, and you don’t go to the theater to see the movie, it will play normally.
  • Cold stream: When there are no consumers, the data will not be produced. You triggered it, it sent data. For example, this movie is open on the Internet. If you don’t play it, he won’t play it. If you play it voluntarily, he will play it. The RxJava counterpart is the cold data flow of coroutinesFlow.

Sixth, the Flow

A Flow is an asynchronous data Flow that issues values in sequence and completes normally or abnormally. Is a responsive API for Kotlin coroutines, similar to the presence of RxJava.

Each Flow internally executes sequentially, similar to Sequences. The difference between Flow and Sequences is that Flow does not block the main thread while Sequences do.

1. The basic

Creating a Flow object

Flow also gives us a quick create action:

  • Flow: createFlowTo create a cold data stream from a given suspended function.
  • channelFlow: Supports buffer channels, thread safe, allowing differentCorotineContextSend an event.
  • .asFlow(): Converts other data to normalflow, is generally the aggregate directionFlowThe conversion.
  • flowof(vararg elements: T): Use mutable arrays for quick creationflow, similar to thelistOf().

For example, you can use (1.. 3.) asFlow () or flowof (1.. 3) Create Flow objects.

Consumption data

Lifecyclescope.launch {//1. Create a Flow Flow <Int> {for (I in 1.. 3) {delay (200) / / 2. The data from emit (I)}}. Collect {/ / 3. Print (" collect :$it")}}Copy the code

The printed data is as follows:

[the main] collection:1[the main] collection:2[the main] collection:3
Copy the code

As with RxJava, we also need to call the emit method to emit data when creating the Flow object, and the collect method to consume the collected data.

  • emit(value): Collects upstream values and issues them. It is not thread safe and should not be called concurrently. Thread safe please usechannelFlowRather thanflow.
  • collect(): Receives the given collectoremit()Issued value. It is a suspended function that executes on the thread in the scope.

Flow’s code block only runs when it calls collected(), just as RxJava-created Observables only run when it calls subscribe(). If you are familiar with RxJava, you can understand that collect() corresponds to subscribe() and emit() corresponds to onNext().

Contrast type Flow RxJava
The data source Flow<T> Observable<T>
To subscribe to collect subscribe
launch emit() onNext()

Cold stream

A Flow is a cold data Flow in which the code in the Flow generator does not run until the Flow has been collected. When a Flow is created, no consumption means no production, and multiple consumption means multiple production. Production and consumption always correspond.

A cold stream is a stream that is produced only when consumed, which is the exact opposite of a Channel: the sender of a Channel does not depend on the receiver.

val flow = flow { for (i in 1.. 3) {delay(200) emit(I)// emit value from flow}} lifecyclescope.launch {flow.collect {print("$it")} flow.collect {print("$it")} }Copy the code

If you consume it, it will output 1,2,3, and if you consume it again it will output 1,2,3. The same is true of RxJava Observable, which re-consumes the subscribe every time it is called.

2. Thread switchover

RxJava is also an asynchronous framework based on a responsive programming model, where the best part is switching threads. Two apis for switching schedulers are subscribeOn and observeOn, and Flow can also set the scheduler it runs with. It’s simpler to use flowOn:

lifecycleScope.launch {
    // Create a Flow
      
    flow {
        for (i in 1.3.) {
            delay(200)
            emit(i)// Emit values from the stream
        }
    }.flowOn(Dispatchers.IO)// Put the above data emission operation into the coroutine in the IO thread
            .collect { value ->
                // Specific data consumption processing}}Copy the code

What you change with flowOn() is the thread within the Flow function when the data is emitted, while collecting data automatically cuts back to the thread that created the Flow.

In the scheduler API of Flow, it seems that only flowOn corresponds to subscribeOn, but in fact, the scheduler of the coroutine where Collect resides corresponds to the scheduler specified by observeOn.

Contrast type Flow RxJava
Change the thread that sends data flowOn subscribeOn
Change the thread that consumes data It automatically switches back to the scheduler of its coroutine observeOn

Note: Internally using withContext() to switch flow threads is not allowed. Since flow is not thread-safe, use channelFlow if you must.

3. Exception handling

Flow exception handling is also straightforward. You can call the catch function directly:

lifecycleScope.launch {
    flow {
        emit(10)// Emit values from the stream
        throw NullPointerException()// Throw a null pointer exception
    }.catch { e ->// Catch an exception thrown upstream
        print("caught error: $e")
    }.collect {
        print("The collection:$it")}}Copy the code

The printed data is as follows:

[the main] collection:10
[main] caught error: java.lang.NullPointerException
Copy the code

A null pointer exception is thrown in the Flow argument, which can be caught directly in the catch function. If the catch function is not called, an uncaught exception is thrown on consumption. The catch function can only catch exceptions upstream of it.

A catch in Flow corresponds to an onError in RxJava:

contrast Flow RxJava
abnormal catch onError

Note: Stream collection can also catch exceptions using try{}catch{} blocks.

4. Complete and cancel

onCompletion

If we want to execute the logic when the flow completes, we can use onCompletion:

Lifecyclescope.launch {flow {emit(10)}.oncompletion {print(" flow completed ")}.collect {print(" collect :$it")}}Copy the code

The printed data is as follows:

[the main] collection:10[main] Flow The operation is completeCopy the code
contrast Flow RxJava
complete onCompletion onComplete

Note: The stream can also use the try{}finally{} block to perform an action when the collection is complete.

cancel

Flow does not provide cancellation operations. The consumption of a Flow depends on the collect terminal operator, which in turn must be called within the coroutine, so the cancellation of a Flow depends mainly on the state of the coroutine in which the terminal operator is located.

lifecycleScope.launch {
    //1. Create a subcoroutine
    val job = launch {
        / / 2. Create a flow
        val intFlow = flow {
            (1.. 5).forEach {
                delay(1000)
                //3. Send data
                emit(it)
            }
        }

        //4. Data collection
        intFlow.collect {/ / collection
            print(it)
        }
    }

    //5. Cancel the coroutine after 3.5 seconds
    delay(3500)
    job.cancelAndJoin()
}
Copy the code

Data is sent once 1000 milliseconds, and the coroutine is cancelled after 3500 milliseconds, so flow is cancelled after 1,2,3. To cancel Flow, just cancel the coroutine in which it resides.

5. Back pressure

What is back pressure? That’s what happens when the producer’s rate of production is higher than the consumer’s rate of processing, the amount of emission is greater than the amount of consumption, causing a blockage, which is the equivalent of the pressure going back, which is the back pressure. As long as there is reactive programming, there will always be back pressure problems. There are three ways to deal with back pressure problems:

  • buffer: Specifies a fixed capacity cache.
  • conflate: Retain the latest value;
  • collectLatest: When a new value is sent, cancel the previous one.

Add the buffer

You can specify a size for a buffer. We don’t need to wait for the collection to be executed to immediately launch the data, but the data is temporarily cached to improve performance. If we simply add the cache instead of fixing the root cause of the problem, we will always create a data backlog.

lifecycleScope.launch {
    val time = measureTimeMillis {// The calculation takes time
        flow {
            for (i in 1.3.) {
                    delay(100)// Suppose we are waiting asynchronously for 100 milliseconds
                emit(i)// Issue the next value
            }
        }.buffer()// Cache discharge, don't wait
                .collect { value ->
                    delay(300)// Let's say we're processing 300 milliseconds
                    print(value)
                }
    }

    print("Collection time:$time ms")}Copy the code

It takes 100 milliseconds to fire an element; It takes 300 milliseconds for the collector to process an element. It would take about 1200 milliseconds to send and receive three pieces of data sequentially, but using buffer() to create a cache, don’t wait, it’s faster. The printed data is as follows:

[main] 1
[main] 2
[main] 3[main] Collection time:1110 ms
Copy the code

conflate

When flow represents a partial result of an operation or an operation status update, you may not need to process each value, but only the most recent value.

lifecycleScope.launch {
    val time = measureTimeMillis {// The calculation takes time
        flow {
            for (i in 1.3.) {
                delay(100)// Suppose we are waiting asynchronously for 100 milliseconds
                emit(i)// Issue the next value
            }
        }.conflate()// Combine emissions instead of individually dealing with them
         .collect { value ->
                    delay(300)// Let's say we're processing 300 milliseconds
                    print(value)
                }
    }

    print("Collection time:$time ms")}Copy the code

When the number 1 is dropped for processing, the number 2 and number 3 are already generated, so the number 2 is merged and only the most recent number 1(number 3) is delivered to the collector. The printed data is as follows:

[main] 1
[main] 3[main] Collection time:802 ms
Copy the code

collectLatest

Another approach is to cancel the slow collector and restart it every time a new value is emitted. CollectLatest performs the same basic logic as the conflate operator on them, but cancels the code in their block on the new value.

lifecycleScope.launch {
    val time = measureTimeMillis {
        flow {
            for (i in 1.3.) {
                delay(100)// Suppose we are waiting asynchronously for 100 milliseconds
                emit(i)// Issue the next value
            }
        }.collectLatest { value ->// Cancel and restart with the latest value
            print("Collected values:$value")
            delay(300)// Let's say we're processing 300 milliseconds
            print("Complete:$value")
        }
    }

    print("Collection time:$time ms")}Copy the code

Since the collectLatest code takes 300 milliseconds, but emits a new value every 100 milliseconds, we see that the block runs on each value, but only completes on the last value. The printed data is as follows:

[main] Collected values:1[main] Collected values:2[main] Collected values:3[the main] complete:3[main] Collection time:698 ms
Copy the code

6. Operators

The Flow of the Kotlin coroutine provides a number of operators to process data. Here are some of the more common operators:

Basic operator

Flow operator role
map Conversion operator to convert a value to another form of output
take Receives the value issued by the specified number
filter Filter operator that returns a stream containing only the original values that match the given rule.

End operator

Do collect. Collect is the basic end operator.

Terminal flow operator role
collect The most basic collection of data triggers the flow to run
toCollection Add the result to the collection
launchIn Directly triggers the execution of the flow in the specified scope
toList The given stream is collected fromListA collection of
toSet The given stream is collected fromSetA collection of
reduce Specification that accumulates values starting with the first element and applies parameters to the value of the current accumulator and each element.
fold Specification that accumulates values from the [initial] value and applies the current accumulator value and each element

Functional operator

Functional operator role
retry A retry mechanism that allows the stream to be reexecuted if an exception occurs
cancellable When received, determine if the coroutine has been cancelled, and if so, raise an exception
debounce Anti-shake throttling. Only the latest value in a specified period of time is received and the others are filtered out. Search associative scenarios apply

Callback operator

Callback flow operator role
onStart Is called before the upstream stream begins. You can emit extra elements, and you can also handle other things, such as sending burial points
onEach Called before an element is emitted from upstream to downstream
onEmpty Callback when the stream completes without emitting any elements.

Combination operator

Combinatorial flow operator role
zip Combine two streams, take values from both, and once one stream is finished, the whole process is over
combine After the first launch, the two streams can be launched whenever new data comes in, and the other may be data that has already been launched

The advection operator

Advection is a bit like flatmap in RxJava, which converts the data source you send out to another data source.

The advection operator role
flatMapConcat Data is processed sequentially, expanded and merged into a single stream
flatMapMerge All streams are collected concurrently and their values are merged into a single stream so that values are emitted as quickly as possible
flatMapLatest Once a new stream is emitted, the set of previous streams is cancelled

There are some other operators that I’m not going to go through here, but you can check out the API if you’re interested. It can be used as needed in actual scenarios, such as debounce for search scenarios, retry for network requests, and combine for data.

Learning coroutines and Kotlin is still necessary to simplify the logic of the code, write elegant code, and improve development efficiency.

Pay attention, don’t get lost


Well, everyone, that’s all for this article. Thank you for reading this article. I am Suming, thank you for your support and recognition, your likes are the biggest motivation for my creation. See you in the next article!

My level is limited, the article will inevitably have the mistake, please criticize and correct, very grateful!

Kotlin co-process learning trilogy:

  • Kotlin Coroutine Practical Advanced (1, Building the Foundation)
  • Kotlin Coroutine Practical Advanced (two, advanced)
  • Kotlin coroutine practical advanced (three, principles)

Reference links:

  • Kotlin website
  • Understanding Kotlin Coroutines
  • The New Kotlin from Beginner to Master
  • Most comprehensive Kotlin Coroutine: Coroutine/Channel/Flow and practical applications

I hope we can be friends beforeGithub,The Denver nuggetsLet’s share knowledge and encourage each other together! Keep Moving!