Flow: An Intro for an RxJava user


RxJava is probably the most important library I use. Rx is usually another paradigm for writing code, and Kotlin is a new programming language that makes it easy to implement coroutine driven flows as its own Rx implementation. I may have introduced Coroutines in Hello Kotlin Coroutines, which is necessary to understand flow

Kotlin has a set of extensions to facilitate the use of collections. But it’s not responsive

listOf("Madara"."Kakashi"."Naruto"."Jiraya"."Itachi")
    .map { it.length }
    .filter { it > 4 }
    .forEach {
        println(it)
    }
Copy the code

In this example, if you dig into the map function source code, you’ll see that there’s no magic here, it’s just a loop through the list, doing some transformation and then providing you with a new list. The same goes for filters. This mechanism, called eager Evaluation, operates across the entire list and provides a new list. But if we don’t need to create these temporary lists to save some memory, we can use Sequences instead

listOf("Madara"."Kakashi"."Naruto"."Jiraya"."Itachi")
	/ / use the Sequence
    .asSequence()
    .map { it.length }
    .filter { it > 4 }
    .forEach {
        println(it)
    }
Copy the code

The difference here is that we call the asSequence method first and then use our operation. When we look at the map method again, we see some differences. It is just a modifier for the sequence and the return value type is also sequence. When using sequence map, you can only operate one item at a time. When the list is large, a sequence is much better than a normal collection. Sequence can do its work synchronously. Is there a way to use those conversion operators asynchronously? The answer is a flow

flow

If we try to get the list and use it as a flow and call collect {.. }, you will receive a compilation error. Because flow is built on coroutines, it has asynchronous capabilities by default, so you can use it when you use coroutines in your code

Collect {… } operator, which you can think of as SUBSCRIBE in Rxjava

A flow is also cold stream, which means that the flow will not be executed until you call an operator such as collect. If you call collect repeatedly, you get the same result each time

Thus, the Collections extension is only for small data, sequence saves you unnecessary work (not creating temporary lists), and with Flow, you can code with the power of coroutines. So, let’s learn how to build it, okay

Build a flow

Looking at the asFlow method, which is an extension function on Collections that converts it to flow, let’s take a look at the source code

public fun <T>可迭代<T>.asFlow(a): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}
Copy the code

If we were to write the previous example to add some logic to the data source, we would simply use flow{… } or flowof ()

Conversion operator

Flow has a list of operators for transformation, such as map, filter, groupBy, scan, and so on

In flow, which is supported by Coroutines, you can naturally use asynchronous code in your operators, assuming we want to do something time-consuming, represented here by a one-second delay. With RxJava, you can use flatMap

The point here is that Flow has a simpler design and is easier to learn than RxJava, which is known for its steep learning curve, and I’m using Flow to simplify it

The terminal operator

I’ve already mentioned that Collect () is the terminal operator that you get when you call it. In RxJava, you can initiate it by calling SUBSCRIBE () or, using blocking, by calling blockingGet

The terminal operator in flow is the suspension function that requires scoped operations, and the other operators are for example

  • ToList (), toSet -> returns all items in the collection

  • First () -> Returns only the first launch

  • Reduce (), fold() -> Get results using specific operations

Emission data

To transmit data, you need to use a suspend function

//fire a coroutine
someScope.launch {
  //fire flow with a terminal operator
  flowSampleData().collect { }
}
Copy the code

The curly braces above are reminiscent of callbacks, and you can use the launchIn function to process the results using onEach{… }

flowSampleData()
    .onEach {
     //handle emissions
    }
    .launchIn(someScope)
Copy the code

cancel

Each time we set up an RxJava subscription, we must cancel these subscriptions to avoid memory leaks or expired tasks running in the background. RxJava provides a reference to the subscription (disposable) to cancel the subscription, disposable().dispose(). If you use multiple objects in CompositeDisposable, call clear() or dispose()

Coroutines that use a specific scope for flow can do this without additional work

Error handling

One of the most useful features of RxJava is the way it handles errors, and you can use this onError() function to catch any errors in your workflow. Flow has a similar name called catch {… }, if no catch {… }, your code may throw an exception or an application crash. You can choose to use a regular try catch or use atch {… } is encoded declaratively

Let’s simulate an error

private fun flowOfAnimeCharacters(a) = flow {
    emit("Madara")
    emit("Kakashi")
    // Throw an exception
    throw IllegalStateException()
    emit("Jiraya")
    emit("Itachi")
    emit("Naruto")}Copy the code

use

runBlocking {
    flowOfAnimeCharacters()
        .map { stringToLength(it) }
        .filter { it > 4 }
        .collect {
            println(it)
        }
}
Copy the code

If we run this code, it will throw an exception, and as we said, you have two options for handling errors, namely regular try-catch and catch {… }. This is the modified code in both cases

/ / use try-catch
runBlocking {
    try {
        flowOfAnimeCharacters()
            .map { stringToLength(it) }
            .filter { it > 4 }
            .collect {
                println(it)
            }
    } catch (e: Exception) {
        println(e.stackTrace)
    } finally {
        println("Beat it")}}Copy the code
/ / use the catch {}
runBlocking {
    flowOfAnimeCharacters()
        .map { stringToLength(it) }
        .filter { it > 4 }
         // catch
        .catch { println(it) }
        .collect {
            println(it)
        }
}
Copy the code

One thing to watch out for with catch{} is the order in which the catch{} operators are placed, before the Terminal operator, so that you can catch the desired exception

restore

If an error interrupts the flow and we intend to use a full backup or default data recovery stream, use onErrorResumeNext() or onErrorReturn() in Rxjava and catch {… }, but we called emit() to generate the backups one by one, and we could even use emitAll() to introduce a whole new flow, such as “Minato” and “Hashirama” if an exception occurs halfway through.

runBlocking {
    flowOfAnimeCharacters()
        .catch {
            emitAll(flowOf("Minato"."Hashirama"))
        }
        .collect {
            println(it)
        }
}
Copy the code

So what I get is this

Madara
Kakashi
Minato
Hashirama
Copy the code

flowOn()

By default, the flow data source will run in the caller context, and if you want to change it, for example, to run flow on IO instead of Main, use flowOn() and change the context upstream, which is all the operators before calling flowOn. This is a good example of documentation

Here flowOn() acts as the two roles [subscribeOn() — observeOn()] in RxJava, and you can write the flow and then determine in which context you will operate

complete

Once the flow has finished emitting, you may need to do something, onCompletion {… } can solve this problem, and it determines whether the flow completes normally or abnormally

Known data sources are as follows

private fun flowOfAnimeCharacters(a) = flow {
    emit("Madara")
    emit("Kakashi")
    throw IllegalStateException()
    emit("Jiraya")
    emit("Itachi")
    emit("Naruto")}Copy the code

Catch {… } is the job of catching IllegalStateException() and starting the new process over again, which makes us leave “Madara”, “Kakashi” at the source, “Minato”, “Hashirama” behind. But onCompletion {… Will} display an error?

The answer is no, catch catches all the errors, and here’s a whole new thing, remember onCompletion {… } and catch {… } is just the mediator operator. The order is important

conclusion

You can use Flow Builders to build flows, the most basic of which is Flow {… }. To start the flow, call something like Collect {… }, and since terminal operator is a suspend function, you need to use the coroutine builder launch {… }, or if you want to operate in an elegant style, you can use a combination of launchIn() and onEach {… }. Use the catch {… } catch upstream errors and provide a rollback process as needed. onCompletion {.. } will be triggered after all launches are completed upstream or when an error occurs. By default, all of these methods apply to the calling program coroutine context. If you want to change the upstream context, use flowOn()

About me

I am Flywith24, and my blog content has been classified here. You can click Watch in the upper right corner to get updates of my articles in time, oh 😉

  • The Denver nuggets
  • Jane’s book
  • Github