Flow, the Coroutine version of RxJava, is as easy to switch threads as RxJava. This paper makes a simple comparison between the two in multi-threaded scenarios.

1. RxJava


Let’s start by reviewing thread switching in RxJava

As above, RxJava uses subscriberOn to switch threads with observeOn

subscribeOn

SubscribeOn is used to determine which thread to subscribe to, which in the case of Cold flow determines the thread that emits the data. There are two points to note in use:

  1. When there is only one on the call chainsubscribeOn, can appear in any position

The effect is the same: the data is emitted after the IO thread subscribes

  1. When there are multiple calls on the chainsubscribeOn, only the first one takes effect:

The second subscribeOn above doesn’t make sense

observeOn

ObserveOn is used to determine which thread to respond to:

  1. observeOnDetermines the thread that calls the upstream and downstream operators of the chain

The green line above will run on the main thread

  1. withsubscribeOnDifferent, multiple calls are allowed on the chainobserveOnAnd each of them is valid

The blue and green parts above are switched to different threads due to the presence of observeOn

just

One mistake that beginners to RxJava often make is in observable. just(…) Do time-consuming tasks. Just does not accept a lambda, so it is executed immediately, not subscribeOn

As above, loadDataSync() does not execute on IO,

To execute in IO, use Observable.deffer{}

flatMap

In conjunction with RxJava thread switching described above, look at the following code

If we want to execute loadData(ID) concurrently, this is written incorrectly.

Subscribe (IO ()) means that its upstream data is sequentially emitted in a single thread. So while flatMap{} returns multiple Observables that subscribe to a single thread, multiple loadData always run on the same thread.

The code can be modified to achieve concurrent execution:

Subscribe threads are specified separately by subscribeOn when subscribing to the Observable returned by flatMap.

Other operators like flatMap that involve multiple Observable subscriptions (such as merge, zip, etc.) need to pay attention to their respective subscribeOn threads to prevent unexpected behaviors.

2. Flow


Let’s look at the thread switch for Flow.

Flow is a thread switch based on CoroutineContext, so this section requires a basic understanding of Croutine.

flowOnSimilar to subscribeOn of RxJava, there is no operator corresponding to observeOn in Flow, becausecollectIs a suspend function that must be set inCoroutineScopeSo the response thread is executed byCoroutineContextA decision. Let’s say you do it in maincollect, then the response thread isDispatcher.Main

flowOn

saidflowOnSimilar to thesubscribeOnBecause they can both be used to determine upstream threadsIn the code above,flowOnThe previous code will be executed in IO.

withsubscribeOnThe difference is that flowOn allows multiple occurrences, each affecting its previous operationsThe code above, you can see it by colorflowOnArea of influence

launchIn

collectIs the suspend function, so subsequent code does not continue because the coroutine is suspendedSo the above code may not meet expectations because the first onecollectWe can’t get to the second one until we finish.

The correct way to write it is for eachcollectA single coroutineOr uselaunchIn“Is written more elegantly launchInDoes not suspend coroutines, so with RxJavasubscribeCloser.

As the name suggests, launchIn is just the syntactic sugar of the chain-call launch from the previous example.

flowOf

flowOfSimilar to theObservable.just()Note that the contents of flowOf are executed immediately and are not affectedflowOnimpact

Calculate () is expected to run on IO, using flow{}

flatMapMerge

flatMapMergeSimilar RxJavaflatMap As shown above, the flatMap of two items respectively forms two items, that is, a total of four data are emitted, and the log output is as follows:

inner: pool-2-thread-2 @coroutine#4
inner: pool-2-thread-3 @coroutine#5
inner: pool-2-thread-3 @coroutine#5
inner: pool-2-thread-2 @coroutine#4
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
Copy the code

It can be seen from logs that flowOn is written outside of the flatMapMerge, while inner logs can be printed on multiple threads (all from pool2). This is different from flatMap, which can only run on fixed threads in the same thread pool.

If you write flowOn inside the flatMapMerge

The results are as follows:

inner: pool-2-thread-2 @coroutine#6
inner: pool-2-thread-1 @coroutine#7
inner: pool-2-thread-2 @coroutine#6
inner: pool-2-thread-1 @coroutine#7
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
Copy the code

Inner is still printed on multiple threads and flowOn is processed in the flatMapMerge regardless of whether it is written inside or outside the flatMapMerge.

butflatMapMergeThere is a difference. Look at the following two pieces of code

The color shows the scope of flowOn’s influence, tracing up to flowOf

3. Summary


Both RxJava Observables and Coroutine flows support thread switching. The related apis are compared as follows:

Thread pool scheduling Thread operator Synchronous data source creation Asynchronous create Concurrent execution
RxJava Schedulers (io(), computation(), mainThread()) subscribeOn, observeOn just deffer{} flatMap(inner subscribeOn)
Flow Dispatchers (IO, Default, Main) flowOn flowOf flow{} flatMapMerge(inner or outer flowOn)

Finally, take a look at an example of how to migrate code from RxJava to Flow

RxJava

The RxJava code looks like this:

Using theSchedulersThe definition is as follows:

Code execution result:

1: pool-1-thread-1
1: pool-1-thread-1
1: pool-1-thread-1
2: pool-3-thread-1
2: pool-3-thread-1
2: pool-3-thread-1
inner 1: pool-4-thread-1
inner 1: pool-4-thread-2
inner 1: pool-4-thread-1
inner 1: pool-4-thread-1
inner 1: pool-4-thread-2
inner 1: pool-4-thread-2
inner 1: pool-4-thread-3
inner 2: pool-5-thread-1
inner 2: pool-5-thread-2
3: pool-5-thread-1
inner 2: pool-5-thread-2
inner 1: pool-4-thread-3
inner 2: pool-5-thread-2
inner 2: pool-5-thread-3
3: pool-5-thread-1
3: pool-5-thread-1
3: pool-5-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
inner 1: pool-4-thread-3
end: pool-6-thread-1
3: pool-5-thread-1
inner 2: pool-5-thread-1
3: pool-5-thread-1
inner 2: pool-5-thread-3
inner 2: pool-5-thread-1
end: pool-6-thread-1
3: pool-5-thread-3
3: pool-5-thread-3
end: pool-6-thread-1
inner 2: pool-5-thread-3
3: pool-5-thread-3
end: pool-6-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
Copy the code

The code is longer and color coded to help clarify thread relationships

It is clear after coloring. It should be noted that since the data source was switched in the flatMap and the thread was switched at the same time, the thread that printed 3 was not S2 but S4

Flow

The prime minister creates the corresponding Dispatcher

Then change the code to Flow, following the following principles

  • RxJava throughobserveOnSwitch the thread of subsequent code
  • The Flow throughflowOnSwitch the thread of leading code

The print result is as follows:

1: pool-1-thread-1 @coroutine#6
1: pool-1-thread-1 @coroutine#6
1: pool-1-thread-1 @coroutine#6
2: pool-2-thread-2 @coroutine#5
2: pool-2-thread-2 @coroutine#5
2: pool-2-thread-2 @coroutine#5
inner 1: pool-3-thread-1 @coroutine#10
inner 1: pool-3-thread-2 @coroutine#11
inner 1: pool-3-thread-3 @coroutine#12
inner 1: pool-3-thread-2 @coroutine#11
inner 1: pool-3-thread-3 @coroutine#12
inner 2: pool-4-thread-3 @coroutine#9
inner 1: pool-3-thread-1 @coroutine#10
inner 1: pool-3-thread-3 @coroutine#12
inner 1: pool-3-thread-2 @coroutine#11
inner 2: pool-4-thread-1 @coroutine#7
inner 2: pool-4-thread-2 @coroutine#8
inner 2: pool-4-thread-1 @coroutine#7
inner 2: pool-4-thread-3 @coroutine#9
inner 1: pool-3-thread-1 @coroutine#10
3: pool-4-thread-1 @coroutine#3
inner 2: pool-4-thread-3 @coroutine#9
inner 2: pool-4-thread-2 @coroutine#8
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
inner 2: pool-4-thread-2 @coroutine#8
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
inner 2: pool-4-thread-1 @coroutine#7
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
Copy the code

As you can see from the log, the timing of 1, 2, and 3 and the concurrency of inner1 and inner2 are consistent with RxJava.

4. FIN


Flow can completely replace the ability of RxJava in thread switching, and the subscribeOn and observeOn operations conform to the two-to-one flowOn, with lower learning cost. With the flow operator variety becoming more complete, future Android/Kotlin development can say goodbye to RxJava 👋🏻