Before the advent of Kotlin coroutines, RxJava was probably the most popular asynchronous programming solution in Android development.

Which for the data flow chain processing, I believe that RxJava is very familiar with.

What about data flow operations in a Kotlin coroutine?

This is the second article in Kotlin’s coroutine series.

As a former RxJava loyalist, this article attempts to focus on asynchronous data flow operations in Kotlin coroutines from the perspective of RxJava users.

Kotlin version: 1.5.31

Coroutine version: 1.5.2

RxJava version: 3.0.10

Kotlin coroutine series related articles navigation

Scratch the Kotlin coroutine

Kotlin Flow Guide (1) Basic Usage

Kotlin Flow guide (2) ChannelFlow

Kotlin Flow Guide (3) SharedFlow and StateFlow

The following text.


Although the Kotlin library also provides Sequence and set operators, they can also perform chained data flow operations.

However, synchronous data flow operations are still blocking threads, so are there asynchronous data flow operations similar to RxJava?

The reactive programming provided in the Kotlin coroutine is the Kotlin Flow.

Flow is simply defined as a class that can be collected by subscription.

The FlowCollector defines the ability to send data.

In the basic Kotlin coroutine,

  • launchA coroutine that starts, returns no value, terminates after it runs, similar toRxJavatheCompletable.
  • asyncThe coroutine that started can passawaitFunction gets the coroutine return value, but can only return a single value and cannot perform data flow operations.

The emergence of Kotlin Flow just makes up for the lack of Kotlin coroutine’s asynchronous operation of multiple values, and allows complex data Flow operations.

RxJava’s Observable and Flowable types are compatible with Maybe as well.

create

Let’s start by looking at how the Flow data Flow is created.

There are many ways to create a Flow. The simplest is to use the top-level function Flow.

The suspend function is decorated with FlowCollector as the receiver and can call emit directly to send data.

val flow = flow<String>{ for(i in 1.. 5){ emit("result $i") } }Copy the code

There are other creation methods such as asFlow and flowOf.

Official provides a lot of type conversion to Flow extension function, here is not listed, interested can see the source code.

  • Because the flow function code block is also decorated by suspend, the suspend function can be called internally.

  • The EMIT function of the flow code block is not thread safe, so the flow function cannot modify the coroutine context and cannot call functions such as withContext to avoid the downstream COLLECT being scheduled to other threads.

  • If you want to modify the coroutine scheduling of a data flow, you can only call the flowOn function.

Terminal operator

The data Flow created by Flow is a cold Flow, that is, the data Flow production operation must be performed after the end operator is called.

  • The creation of a data stream is paired with the consumption of the data stream
  • Multiple data stream subscription consumption will also have multiple data source production creation

In RxJava, subscribe is called to consume data stream. In Flow, collect is called.

However, the collect function declared in the Flow class is annotated with @internalCoroutinesAPI, indicating that the function cannot be directly called externally.

Fortunately, there is a Flow extension function of the same name available for external calls.

Collect is a hang function, and restrictions can only be called in a Kotlin coroutine (or another hang function).

But flow and the other operators are ordinary functions that can be created outside the coroutine.

In other words, it is possible to separate data stream creation from data stream consumption

Funtestflow (){val scope = CoroutineScope(container)) val flow = container { 2). ForEach {println("emit $it") emit(it)}} scope.launch{flow.collect {// Consume data stream println("last collect result: emit $it") emit(it)} $it") } }.join() }Copy the code

Other end operators are also provided

  • Collection conversion type, such astoList,toSet.
  • Gets a data stream-specific element, such asfirst,last.
  • Terminal operator (accumulator), e.greduce,fold.

These end operators are suspend functions, and consumption of the data stream is restricted to running in the Kotlin coroutine (suspend function).

LaunchIn, a convenient extension function, is also provided, specifying that it runs in a particular coroutine scope but ignores the end data stream, often in conjunction with other operators.

Intermediate operator

Rich intermediate operators are provided in Flow to allow the Flow of data to generate more gameplay.

These intermediate operators intercept (consume) the upstream data flow, process it, and return a new data flow to the downstream data flow. This enables the concatenation of the different intermediate operators (consumers) and chain calls.

Intermediate consumers do not trigger the data stream to be transmitted

The number of intermediate operators in Flow is still relatively small compared to RxJava, but it is sufficient for daily use and even more powerful. There may be a small number of rare operators that need to be customized.

transform

Let’s start with our old friend, the data flow transformation operator

  • transform

The function type receives a FlowCollector as the receiver and the value passed upstream of the data stream as the function parameter.

The transform operator implements intercepting, collecting and forwarding upstream data.

After subscribing to intercept the upstream data stream, you have the ability to send data downstream via a function type with FlowCollector as the receiver.

  • transformCan call multiple timesemitFunction sends data

As the most basic implementation, the other data flow transform operators are extended from transform.

  • map

    Passes each value of the upstream data stream to the downstream by specifying the result of the transformation function.

    flow{ (0.. 2).forEach { println("emit $it") emit(it) } }.map{ println("running map $it") it * 2 + 1 }.collect{ println("last collect result $it") } emit 0 running map 0 last collect result 1 emit 1 running map 1 last collect result 3 emit 2 running map 2 last collect result 5Copy the code

FlatMap is used in RxJava when each value of an upstream data stream needs to be converted to another data stream.

There is no operator of the same name in Flow, but something similar exists.

  • flatMapConcat

    Convert each value of the upstream data stream to another data stream.

    Expand the new Flow data Flow, wait for the new data Flow to pass each value to the downstream, and then continue to fetch the next value from the upstream, in order.

    @FlowPreview fun test() = runBlocking{ flow{ (0.. 2).forEach { println("emit $it") emit(it) } }.flatMapConcat {index-> flow<String> { println("running flatMapConcat first  $index") emit("flatMapConcat first $index") println("running flatMapConcat second $index") emit("flatMapConcat second $index") println("running flatMapConcat end $index")}}. Collect {println("last collect result: $it")}} emit 0 running flatMapConcat first 0 last collect result: FlatMapConcat first 0 running flatMapConcat second 0 last collect result: FlatMapConcat second 0 running flatMapConcat end 0 emit 1 Running flatMapConcat first 1 last collect result: FlatMapConcat first 1 Running flatMapConcat second 1 last collect result: FlatMapConcat second 1 Running flatMapConcat end 1 Emit 2 Running flatMapConcat first 2 last collect result: FlatMapConcat first 2 Running flatMapConcat second 2 last collect result: flatMapConcat second 2 running flatMapConcat end 2 after flowCopy the code

    This is equivalent to the RxJava flatMap operator.

  • flatMapMerge

    Similar to flatMapConcat, flatMapMerge allows concurrent operations. After receiving upstream data streams, flatMapMerge collects upstream data values as far as possible until the maximum number of concurrent data flows is reached, and then simultaneously transfers new data streams to the downstream.

    Because flatMapMerge is concurrent, the order of sending is not guaranteed

    @FlowPreview fun test() = runBlocking{ flow{ (0.. 2).forEach { println("emit $it") emit(it) } }.flatMapMerge {index-> flow<String> { emit("flatMapConcat $index ,1") Emit ("flatMapConcat $index,2")}}. Collect {println("last collect result: $it")}} emit 0 emit 1 emit 2 Last collect result: flatMapConcat 0,1 last collect result: FlatMapConcat 0,2 Last collect Result: flatMapConcat 1,1 Last collect result: flatMapConcat 1,2 Last collect result: FlatMapConcat 2,1 Last collect Result: flatMapConcat 2,2Copy the code

    The flatMapMerge function accepts a concurrency parameter that represents the number of concurrent requests that are allowed to receive upstream data. The default is 16.

    When concurrency = 1, it is equivalent to flatMapConcat.

However, the use of these two operators is not officially recommended. From the operator comments, it seems that the official preference is to keep the linear operation of the data stream and improve the readability of the data stream operation.

After all, flatMaps are less logical than maps, and usually a map is sufficient. The FlowCollector extension function emitAll is also available for sending data streams.

FlatMapConcat actually calls emitAll to send a new Flow.

FlatMapConcat and flatMapMerge are annotated with @FlowPreview, which are preview properties and do not guarantee downward compatibility of subsequent versions.

If there is a need to convert to a new data stream, calling emitAll in a map may be a better choice.

Thread scheduling

Remember RxJava’s thread scheduling operators subscribeOn and observeOn?

They are used to specify running threads upstream and downstream of the data flow, respectively.

Flow is based on Kotlin coroutines, so thread scheduling must modify the CoroutineContext scheduler.

Operators that allow sending data, however, are limited to not allowing internal calls to suspend functions such as withContext to modify the coroutine context, and only allowing the flowOn operator to modify the coroutine context within the data flow.

The scheduler specified by the flowOn operator works only on the data flow upstream of the operator.

Different from subscribeOn with similar positioning, flowOn can be called several times.

The CoroutineContext specified ranges from the current flowOn operator to the previous flowOn (or data source).

When the downstream data flow is not specified, the scheduler inherits from the external coroutine context.

fun test() = runBlocking{ val myDispatcher = Executors.newSingleThreadExecutor() .asCoroutineDispatcher() flow { println("emit on ${Thread.currentThread().name}") emit("data") }.map { println("run first map FlowOn (dispatchers.io).map {println("run second ").flowon (dispatchers.io) FlowOn (myDispatcher).collect {${it},${it. println("result $it on ${Thread.currentThread().name}") } } emit on DefaultDispatcher-worker-2 @coroutine#3 run first map on DefaultDispatcher-worker-2 @coroutine#3 run second map on pool-1-thread-1 @coroutine#2 result data map,8 on Test worker @coroutine#1Copy the code

Compared with subscribeOn and observeOn, flowOn is more flexible in use.

If you want to achieve the observeOn effect, try toggling the scheduler with a withContext call from an external coroutine.

The event

Hook operators for data flows are also provided in Flow.

  • OnStart performs the operation before the upstream data flow begins

    The parameter code block takes the FlowCollector as the receiver and has the ability to send data.

    If data is sent within onStart, the onStart data is first sent to the downstream before the upstream data stream is sent to the downstream.

    In terms of functionality, it is equivalent to RxJava’s combined startWith and doOnSubscribe operators, and more powerful.

    Its parameter code block is the suspend function modified by suspend, so that it can also call the suspend function, such as delay, can easily realize the operation of delaying sending data.

  • onEach

    An operation is performed before each value of the upstream data stream is sent.

    Equivalent to doOnNext or doOnSuccess in RxJava, and also decorated with the suspend keyword.

  • onCompletion

    Operations performed when the upstream data flow is complete, cancelled, or abnormal.

    Unlike onComplete or doFinally in RxJava, the parameters received by onCompletion are also of the function type FlowCollector as the receiver, with the ability to send data.

    And the arguments to the function type provide a collection of exceptions thrown, or null if they end well.

    Note that onCompletion does not catch exceptions, and you cannot send data when an exception occurs.

    To catch exceptions and send data, use the catch operator.

    The location of onCompletion in the data flow requires attention, as shown in the retry operators below.

    This operator can be equivalent to imperative code:

    try { flow.collect { value -> ... }} finally {//onCompletion code block... }Copy the code

    But since you’re using Flow for reactive programming, try to put everything in the data Flow.

  • onEmpty

    When the upstream data flow completes without any value passing consumption, an operation is triggered to send additional elements.

    The upstream data stream also includes data sent by operators such as onStart and onCompletion.

Exception handling

In the days of RxJava, exception catching was believed to be one of the most useful features, with onError being able to catch exceptions thrown in the data stream. There is of course an equivalent in Flow.

While it is possible to catch an exception directly in a try-catch block, it is better to make the internal exception transparent and throw it back out for uniform processing in data flow operations.

  • catch

    Catch exceptions thrown in the upstream data stream and allow new data to be sent.

    Exceptions caught by a catch are not passed downstream.

    The catch operator is best placed downstream of the data stream to catch all exceptions thrown upstream.

    Similar to RxJava onErrorRetrun operator and onError combination.

    However, if an exception is thrown in downstream Collect, it cannot be caught.

    At this point, some operations of COLLECT can be moved forward to the upstream onEach, and then the downstream catch will capture and send the abnormal data.

    Finally can be set up in the external coroutines CoroutineExceptionHandler out an exception.

retry

As with RxJava, when an exception occurs in the data flow, the entire data flow ends and subsequent data cannot be sent.

But there are scenarios where we don’t want the data flow to end, and retry mechanisms are needed.

  • retry

    When an exception occurs in the upstream data flow, if the conditions are met, the system catches the exception and sends the upstream data flow again. After the number of retries exceeds, the system continues to throw an exception to the downstream.

  • retryWhen

    Inside the retry is the retryWhen call, which, after an exception is caught and if retry conditions are met, rerunks the upstream data flow until the exception is caught again.

    If the retry condition is not met, exceptions continue to be thrown downstream

  • ifcatchThe operator in the upstream data stream will be directly mediated bycatchCapture, not enterretryWhen. Usually need tocatchCaptured downstream byretryWhenThe exception thrown.

This is equivalent to putting onErrorReturn upstream of retryWhen in RxJava.

  • whenonCompletionThe operator is upstream of the data stream, and the exception enters firstonCompletionExecute, but do not emit data before enteringretryWhenThe same is true after retry.

If you want to use the retryWhen and Retry operators, stay as close to the data flow operation as possible to avoid unexpected situations.

merge

Some scenarios may require us to merge data streams. In RxJava you can use merge operators such as concat and zip, and similar capabilities are provided in Flow.

  • combine

    The latest values in the two data sources are merged by a function and passed directly downstream.

    Similar to RxJava’s combineLatest operator, but can be any type of data flow merge in flow.

    Val flow1 = flowOf("first","second","third"). OnEach {delay(50)} val flow2 = FlowOf (1,2,3). OnEach {delay(100)} flow1.combine(flow2){first,second-> "$first $second"}. Collect {println("result $it")} // When the first element of the flow2 element is sent, the second element of the flow1 source is the latest value. Result second 1 result third 1 result third 2 result third 3Copy the code
  • combineTransform

    The effect is the same as combine.

    The only difference is that Combine sends the data immediately after the merge, whereas combineTransform transforms to let the user control whether to send the combined value.

  • zip

    Each value in the two data sources is merged and sent downstream, and the data flow completes as soon as one of the data sources terminates.

    The zip operator is the same as in RxJava, but the function type supports suspend functions.

    Val flow1 = flowOf("first","second","third"). OnEach {delay(50)} val flow2 = FlowOf (1,2,3,4). OnEach {delay(100)} flow1.combine(flow2){first,second-> "$first $second"}. Collect {println("result $it")} // Because flow1 has only three elements, the fourth element of flow2 is discarded. result first 1 result second 2 result third 3Copy the code

Combine and the combineTransform operator support multiple data source merging,

Zip currently supports only two data source combinations.

Common operators

In addition, there are many other operators. Here are just a few common ones, which are basically equivalent to the RxJava equivalent of the same name.

  • first

    The end operator takes only the first element of the upstream data stream, or the first element that satisfies the condition, and the subsequent elements are discarded.

  • last

    End operator that takes only the last element of the upstream data stream and throws an exception if the element is null.

    You can use lastOrNull to get the last nullable element.

  • filter

    An intermediate operator that filters values from the upstream data stream, allowing only those values that meet the criteria to continue passing. Based on the transform

  • take

    An intermediate operator that takes only a specified number of elements from the upstream data stream and passes them downstream, discarding subsequent elements

  • decode

    Intermediate operators, which allow only the latest value to be passed downstream for a specified period of time, are used to filter high frequency producer data upstream.

    In the era of RxJava, the RxBinding library was used to convert View events into streams of data, and the decode operator was used to filter the input of EditText and the click of the View.

    The decode operator is currently a preview in Flow with no guarantee of compatibility with subsequent versions.

  • distinctUntilChanged

    The intermediate operator, which filters duplicate elements in the upstream data stream, is equivalent to the DISTINCT operator in RxJava.

Operator reuse

In RxJava, we might want to reuse a common set of data flow operations across multiple data flows by encapsulating them in Transformer and then calling them uniformly using the compose operator.

For example, some common handling of data returned from network requests.

class MyTransformer<T>(...) : MaybeTransformer<T,T>{ override fun apply(upstream: Maybe<T>): MaybeSource<T> { return upstream .subscribeOn(Schedulers.io()) .map { ... } / / pretreatment request returns. ObserveOn (AndroidSchedulers. MainThread ()). OnErrorReturn {... } // Send error data when an exception occurs}} API.com pose(MyTransformer(...) ).subscribe(...)Copy the code

In the Flow design, we can see that the designers had the same idea in the early days and did have a compose operator. But it’s abandoned, and Kotlin really doesn’t need to do that anymore.

Recall how operators are defined in Flow. Using Kotlin’s extension function, it is entirely possible to define an operator that calls these operations internally.

fun <T> Flow<T>.preHandleHttpResponse( dispatcher: CoroutineDispatcher = Dispatchers.IO, ){ map {response-> if (it.code == 0) it else throw MyHttpException(...) }.flowon (dispatcher). Catch {error-> // emit(...) // Send data indicating error in case of exception}} flow{... }.preHandleHttpResponse().collect{... }Copy the code

cancel

Subscribe in RxJava returns Disposable to control stream cancellation, or AutoDispose binding Lifecycle to automatically manage stream closure.

Flow itself does not provide cancellation function, but the consumption of Flow is a suspended function, which must run in the Kotlin coroutine. The cancellation of Flow can be managed directly by using the Job object when the external coroutine is started.

Back pressure

Data back pressure is an inevitable problem in responsive programming.

In the producer-consumer model, if the production rate of the producer is much higher than the consumption rate of the consumer, the back pressure problem also arises.

Occurs only in asynchronous scenarios where producers and consumers are in separate threads.

Restricted production end

In RxJava, Flowable has been specifically introduced since RxJava2 to solve the data back pressure problem.

An asynchronous cache pool with a default value of 128 and a Backpressure cache policy are set for it:

  • MISSING: No caching is done
  • ERROR: Is thrown when the cache queue is fullMissingBackpressureExceptionabnormal
  • BUFFER: After the cache queue is full, expand the cache capacity until OOM
  • DROP: If the cache queue is full, new values are discarded.
  • LAEST: The latest value is always put into the cache queue, regardless of the state of the cache pool.

Flow also provides a similar back-pressure caching strategy

  • buffer

    Open a cache based on Channel.

    A Channel can be understood as a BlockingQueue for a Kotlin coroutine, and both receive and send are suspend functions.

     public fun <T> Flow<T>.buffer(
         capacity: Int = BUFFERED,
         onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
     ): Flow<T>
    Copy the code

    The capacity parameter, which represents the capacity of the cache, is BUFFERED by default and is a constant of the Channel.

    • RENDEZVOUS, no cache, one for production and one for consumption.
    • BUFFERED to create a cache with a default size of 64
    • CONFLATEDIs equivalent to setting the cache capacity to 1 and enforcing the cache policy toDROP_OLDEST
    • Custom cache size

    Parameter onBufferOverflow, cache policy mode, using SUSPEND by default,

    • SUSPEND: When the cache is full, the producer is suspended from sending subsequent values until there is a vacancy in the buffer.
    • DROP_OLDEST: When the cache is full, remove the oldest value from the cache and insert the latest value
    • DROP_LATEST: Discard the latest value when the cache is full.
  • conflate

    The CONFLATED operator is provided to quickly use the buffer

  • flowOn

    In fact, flowOn also turns on the default buffer internally after modifying the coroutine context.

    Equivalent to the default buffer(), which naturally supports back pressure.

    flow{ (0.. 100).forEach { delay(50) val currTime = System.currentTimeMillis() println("emit $it on ${currTime - startTime} ms") Emit (it)}}.flowon (dispatchers.io) // equivalent to buffer(), OnEach {delay(300)} // Consumer delay. Collect {val endTime = system.currentTimemillis () println("result: $it on ${endTime - startTime} ms") } emit 0 on 93 ms emit 1 on 156 ms emit 2 on 220 ms emit 3 on 282 ms emit 4 on 345 ms  emit 5 on 409 ms result : 0 on 409 ms emit 6 on 471 ms ... emit 9 on 657 ms result : 1 on 719 ms emit 10 on 719 ms ... // When the default 64-bit cache is exceeded, the production side is suspended and slowed down // consume one, remove it from the cache queue, and produce a fill queue. emit 79 on 5050 ms result : 15 on 5081 ms emit 80 on 5112 ms emit 81 on 5174 ms result : 16 on 5390 ms emit 82 on 5451 ms result : 17 on 5700 ms ... Result: 34 on 11026 ms emit 100 on 11090 ms result: 35 on 11338 ms result: 36 on 11651 ms result: 37 on 11965 ms ... // Until the consumption endsCopy the code

    Note the emit run time above, the buffer, flowOn operators, and ancillary features for fast concurrent upstream data.

    • You can actually use this feature to quickly run multiple tasks concurrently, butbufferUnable to specify a scheduler

    If the above flowOn is replaced by conflate, the program runs as follows:

     emit 0 on 99 ms
     emit 1 on 162 ms
     emit 2 on 224 ms
     emit 3 on 287 ms
     emit 4 on 351 ms
     emit 5 on 413 ms
     result : 0 on 413 ms
     emit 6 on 474 ms
     ...
     emit 9 on 660 ms
     result : 5 on 720 ms
     emit 10 on 720 ms
     ...
     emit 14 on 970 ms
     result : 9 on 1034 ms
     emit 15 on 1034 ms
     ...
     emit 19 on 1286 ms
     result : 14 on 1350 ms
     emit 20 on 1350 ms
     ...
    Copy the code

    At this point, only the latest value can be received. After all, the cache size is only one, and other new values are discarded.

    Similar to RxJava’s LAEST policy.

Restricted consumption end

Back pressure data selection function is also provided on the consumer side

  • collectLatest

    Collect each element passed by the upstream data stream in order. If the current element has not been processed and a new element is passed, the current element is cancelled.

    The collectLatest operator does not care about time-consuming operations on the upstream data stream. Only if the processing time inside the collectLatest function is too long is it counted as unfinished.

    Modify the above program slightly:

    Flow. collectLatest{delay(300) // Delay moving to consumer end... } emit 0 on DefaultDispatcher-worker-1 @coroutine#3 (115 ms) emit 1 on DefaultDispatcher-worker-1 @coroutine#3 (177 ms) emit 2 on DefaultDispatcher-worker-1 @coroutine#3 (240 ms) emit 3 on DefaultDispatcher-worker-1 @coroutine#3 (304 ms) emit 4 on DefaultDispatcher-worker-1 @coroutine#3 (369 ms) emit 5 on DefaultDispatcher-worker-1 @coroutine#3 (433 ms) emit 6 on DefaultDispatcher-worker-1 @coroutine#3 (496 ms) ... // Only the last value was retrieved because the consumer was canceling the collection. result : 100 on Test worker @coroutine#104 (6721 ms)Copy the code

conclusion

Although RxJava is very easy to use, but relatively complex implementation, complex operators, leading to the difficulty of getting started, even the use of many years of old hands also rarely dare to say fully understand.

Compared with the complex java-based implementation of RxJava, Flow, backed by the excellent infrastructure of Kotlin and Kotlin coroutines, is simply a son of Kotlin. Its API implementation can be fairly simplified. After several iterations, Kotlin Flow has become mature at present.

Although some of them are still experimental and may be modified in later versions, they are usually just deprecated warnings and replaced with recommended equivalent functions.

For everyday use, most of the operators listed above are already formal operators that can cover the common range. I believe that Flow design process also reference RxJava design, its operator function and naming are very similar, for people who have used RxJava, it is very easy to use.

In general, Flow is a natural preference for asynchronous data manipulation in Kotlin coroutines.

Projects that use RxJava for asynchronous programming can also try phasing in Kotlin coroutines +Flow instead.

Due to limited space, more advanced functions of Flow will be explored in subsequent articles.

The resources

Kotlin data streams on Android

Kotlin Coroutine Official Documentation (5) – Asynchronous Flows

Kotlin coroutine Flow: An introduction to RxJava users

Kotlin – Unwinding Flow in coroutines