Flow, the Coroutine version of RxJava, is as easy to switch threads as RxJava. This paper makes a simple comparison between the two in multi-threaded scenarios.
1. RxJava
Let’s start by reviewing thread switching in RxJava
As above, RxJava uses subscriberOn to switch threads with observeOn
subscribeOn
SubscribeOn is used to determine which thread to subscribe to, which in the case of Cold flow determines the thread that emits the data. There are two points to note in use:
- When there is only one on the call chain
subscribeOn
, can appear in any position
The effect is the same: the data is emitted after the IO thread subscribes
- When there are multiple calls on the chain
subscribeOn
, only the first one takes effect:
The second subscribeOn above doesn’t make sense
observeOn
ObserveOn is used to determine which thread to respond to:
observeOn
Determines the thread that calls the upstream and downstream operators of the chain
The green line above will run on the main thread
- with
subscribeOn
Different, multiple calls are allowed on the chainobserveOn
And each of them is valid
The blue and green parts above are switched to different threads due to the presence of observeOn
just
One mistake that beginners to RxJava often make is in observable. just(…) Do time-consuming tasks. Just does not accept a lambda, so it is executed immediately, not subscribeOn
As above, loadDataSync() does not execute on IO,
To execute in IO, use Observable.deffer{}
flatMap
In conjunction with RxJava thread switching described above, look at the following code
If we want to execute loadData(ID) concurrently, this is written incorrectly.
Subscribe (IO ()) means that its upstream data is sequentially emitted in a single thread. So while flatMap{} returns multiple Observables that subscribe to a single thread, multiple loadData always run on the same thread.
The code can be modified to achieve concurrent execution:
Subscribe threads are specified separately by subscribeOn when subscribing to the Observable returned by flatMap.
Other operators like flatMap that involve multiple Observable subscriptions (such as merge, zip, etc.) need to pay attention to their respective subscribeOn threads to prevent unexpected behaviors.
2. Flow
Let’s look at the thread switch for Flow.
Flow is a thread switch based on CoroutineContext, so this section requires a basic understanding of Croutine.
flowOn
Similar to subscribeOn of RxJava, there is no operator corresponding to observeOn in Flow, becausecollect
Is a suspend function that must be set inCoroutineScope
So the response thread is executed byCoroutineContext
A decision. Let’s say you do it in maincollect
, then the response thread isDispatcher.Main
flowOn
saidflowOn
Similar to thesubscribeOn
Because they can both be used to determine upstream threadsIn the code above,flowOn
The previous code will be executed in IO.
withsubscribeOn
The difference is that flowOn allows multiple occurrences, each affecting its previous operationsThe code above, you can see it by colorflowOn
Area of influence
launchIn
collect
Is the suspend function, so subsequent code does not continue because the coroutine is suspendedSo the above code may not meet expectations because the first onecollect
We can’t get to the second one until we finish.
The correct way to write it is for eachcollect
A single coroutineOr uselaunchIn
“Is written more elegantly launchIn
Does not suspend coroutines, so with RxJavasubscribe
Closer.
As the name suggests, launchIn is just the syntactic sugar of the chain-call launch from the previous example.
flowOf
flowOf
Similar to theObservable.just()
Note that the contents of flowOf are executed immediately and are not affectedflowOn
impact
Calculate () is expected to run on IO, using flow{}
flatMapMerge
flatMapMerge
Similar RxJavaflatMap
As shown above, the flatMap of two items respectively forms two items, that is, a total of four data are emitted, and the log output is as follows:
inner: pool-2-thread-2 @coroutine#4
inner: pool-2-thread-3 @coroutine#5
inner: pool-2-thread-3 @coroutine#5
inner: pool-2-thread-2 @coroutine#4
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
collect: pool-1-thread-2 @coroutine#2
Copy the code
It can be seen from logs that flowOn is written outside of the flatMapMerge, while inner logs can be printed on multiple threads (all from pool2). This is different from flatMap, which can only run on fixed threads in the same thread pool.
If you write flowOn inside the flatMapMerge
The results are as follows:
inner: pool-2-thread-2 @coroutine#6
inner: pool-2-thread-1 @coroutine#7
inner: pool-2-thread-2 @coroutine#6
inner: pool-2-thread-1 @coroutine#7
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
collect: pool-1-thread-3 @coroutine#2
Copy the code
Inner is still printed on multiple threads and flowOn is processed in the flatMapMerge regardless of whether it is written inside or outside the flatMapMerge.
butflatMapMerge
There is a difference. Look at the following two pieces of code
The color shows the scope of flowOn’s influence, tracing up to flowOf
3. Summary
Both RxJava Observables and Coroutine flows support thread switching. The related apis are compared as follows:
Thread pool scheduling | Thread operator | Synchronous data source creation | Asynchronous create | Concurrent execution | |
---|---|---|---|---|---|
RxJava | Schedulers (io(), computation(), mainThread()) | subscribeOn, observeOn | just | deffer{} | flatMap(inner subscribeOn) |
Flow | Dispatchers (IO, Default, Main) | flowOn | flowOf | flow{} | flatMapMerge(inner or outer flowOn) |
Finally, take a look at an example of how to migrate code from RxJava to Flow
RxJava
The RxJava code looks like this:
Using theSchedulers
The definition is as follows:
Code execution result:
1: pool-1-thread-1
1: pool-1-thread-1
1: pool-1-thread-1
2: pool-3-thread-1
2: pool-3-thread-1
2: pool-3-thread-1
inner 1: pool-4-thread-1
inner 1: pool-4-thread-2
inner 1: pool-4-thread-1
inner 1: pool-4-thread-1
inner 1: pool-4-thread-2
inner 1: pool-4-thread-2
inner 1: pool-4-thread-3
inner 2: pool-5-thread-1
inner 2: pool-5-thread-2
3: pool-5-thread-1
inner 2: pool-5-thread-2
inner 1: pool-4-thread-3
inner 2: pool-5-thread-2
inner 2: pool-5-thread-3
3: pool-5-thread-1
3: pool-5-thread-1
3: pool-5-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
inner 1: pool-4-thread-3
end: pool-6-thread-1
3: pool-5-thread-1
inner 2: pool-5-thread-1
3: pool-5-thread-1
inner 2: pool-5-thread-3
inner 2: pool-5-thread-1
end: pool-6-thread-1
3: pool-5-thread-3
3: pool-5-thread-3
end: pool-6-thread-1
inner 2: pool-5-thread-3
3: pool-5-thread-3
end: pool-6-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
end: pool-6-thread-1
Copy the code
The code is longer and color coded to help clarify thread relationships
It is clear after coloring. It should be noted that since the data source was switched in the flatMap and the thread was switched at the same time, the thread that printed 3 was not S2 but S4
Flow
The prime minister creates the corresponding Dispatcher
Then change the code to Flow, following the following principles
- RxJava through
observeOn
Switch the thread of subsequent code - The Flow through
flowOn
Switch the thread of leading code
The print result is as follows:
1: pool-1-thread-1 @coroutine#6
1: pool-1-thread-1 @coroutine#6
1: pool-1-thread-1 @coroutine#6
2: pool-2-thread-2 @coroutine#5
2: pool-2-thread-2 @coroutine#5
2: pool-2-thread-2 @coroutine#5
inner 1: pool-3-thread-1 @coroutine#10
inner 1: pool-3-thread-2 @coroutine#11
inner 1: pool-3-thread-3 @coroutine#12
inner 1: pool-3-thread-2 @coroutine#11
inner 1: pool-3-thread-3 @coroutine#12
inner 2: pool-4-thread-3 @coroutine#9
inner 1: pool-3-thread-1 @coroutine#10
inner 1: pool-3-thread-3 @coroutine#12
inner 1: pool-3-thread-2 @coroutine#11
inner 2: pool-4-thread-1 @coroutine#7
inner 2: pool-4-thread-2 @coroutine#8
inner 2: pool-4-thread-1 @coroutine#7
inner 2: pool-4-thread-3 @coroutine#9
inner 1: pool-3-thread-1 @coroutine#10
3: pool-4-thread-1 @coroutine#3
inner 2: pool-4-thread-3 @coroutine#9
inner 2: pool-4-thread-2 @coroutine#8
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
inner 2: pool-4-thread-2 @coroutine#8
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
3: pool-4-thread-1 @coroutine#3
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
end: pool-5-thread-1 @coroutine#2
inner 2: pool-4-thread-1 @coroutine#7
3: pool-4-thread-1 @coroutine#3
end: pool-5-thread-1 @coroutine#2
Copy the code
As you can see from the log, the timing of 1, 2, and 3 and the concurrency of inner1 and inner2 are consistent with RxJava.
4. FIN
Flow can completely replace the ability of RxJava in thread switching, and the subscribeOn and observeOn operations conform to the two-to-one flowOn, with lower learning cost. With the flow operator variety becoming more complete, future Android/Kotlin development can say goodbye to RxJava 👋🏻