It’s a good idea to learn about Kotlin’s coroutines and channels before learning about flow, which belongs to the coroutine standard library. But the idea of Flow is consistent with RxJava, and there is no barrier to learning if you are familiar with RxJava. You can think of Flow as Kotlin’s version of RxJava. A Channel is a data Channel used for communication between coroutines. The principle of some operators of flow is realized by Channel.
The official Kotin documentation mentions that flow is inspired by RxJava, so it looks very similar, except that flow is based on Kotlin, based on coroutines.
For the user, flow’s API is user-friendly, intuitive, concise and easy to understand compared to RxJava.
Flow documentation: Coroutines-asynchronous flow
Why learn flow?
Advantages:
- Currently Kotlin is the official son of Seamless Kotlin
- You only need to introduce kotlin coroutine library to send a flow, the number of methods and volume are small, itself is based on coroutine implementation, so it is perfectly integrated with coroutine
- Simple and friendly API, which gives me the feeling that the API literally means the actual operation, without looking at the source code just look at the API to know what it means.
- Custom operators are relatively easy (see some implementations of native operators, not as complex as RXJava)
Disadvantages:
- Operators are not as rich as RxJava, but sufficient, and some are still in preview state
- Your version of Kotlin should be >= 1.3.0, as 1.3.0 is the first stable release of the flow
Basic concepts and usage
Like RxJava, Flow is based on Stream data processing. Also observer mode, the following examples are compared to RxJava for ease of understanding.
Let’s start with an example of RxJava:
Observable. Create <Int> {e-> listOf<Int>(1,2,3). ForEach {e.next (it)}}. Subscribe {println(it)} // 1/2/3Copy the code
This code emits the list data in turn and then prints it out.
An Observable is cold by default and must subscribe to emit data. Create creates a data source and activates the transmission of the data source by subscribing.
Let’s look at an implementation of flow with the same logic:
Flow {listOf<Int>(1,2,3). ForEach {emit(it)}}. Collect {println(it)} // Log: // 1 // 2 // 3Copy the code
It’s easy. Just follow the script and get started. The flow function is responsible for creating the data source, which corresponds to RxJava create and COLLECT corresponds to RxJava SUBSCRIBE. Similarly, flow is cold by default, and only after COLLECT will data flow be transmitted.
Flow uses emit to emit data
Corresponding to onNext in RxJava.
Flow also has a handy way to create a data flow: flowOf()
-
FlowOf (1) : equivalent to Observable.just(1)
-
FlowOf (1, 2, 3) : equivalent to Observables. FromArray (1, 2, 3), emits 1, 2, 3 in sequence
Cut coroutines
RxJava is used to do network request threading (kill chicken ^_^) :
fun getHttp(){
httpObservable.subscribeOn(IoThread)
.observeOn(MainThread)
.subscribe{
// success
}
}
Copy the code
So how does flow cut threads? Compared to RxJava, Flow is designed to be simple, providing only a flowOn function to cut coroutines. The scope upstream of flowOn is executed in its coroutine as follows:
Suspend fun getFlow(){flow {// Run in dispatcher1 listOf(1, 2, FlowOn (dispatcher1).map {// run at dispatcher2 "2"}.flowon (dispatcher2).collect {// Run the coroutine depends on the whole Which coroutine does flow call println(it)}}Copy the code
You can see that in addition to collect, each flowOn() affects the coroutine that the upstream scope runs before it. So which coroutine collect runs on depends on which coroutine you call getFlow on.
Launch (dispather3){// collect dispather3 getFlow()}Copy the code
Now that you’ve learned how to cut coroutines, you can replace your RxJava network requests (manual headers) :
Fun getHttp(){launch(mainDisPather){flow<HttpModel>{// initiate a request}.flowon (ioDispather).collect{// success}}}Copy the code
Catch exceptions
Retrofit simply throws an HttpException in the event of a 404 error, and our application crashes. RxJava is fine. In RxJava, most errors are automatically caught for us. If you can’t catch them, you can customize them in the RxJavaPlugin. So how do you capture flows?
Flow provides an operator: catch()
Flow <Int> {emit(1) emit(2) emit(3) emit(4)}. Map {if(it > 2){throw NullPointerException(" should not be 2")}else{it}}. Catch { e -> println(e) }.collect { println(it) } // log: // 1 // 2 // java.lang.NullPointerException:Copy the code
The catch operator catches upstream errors, and once an upstream error occurs, the data stream no longer emits downstream. So the above code will only receive 1 and 2.
I think it’s a little bit more flexible, you catch exceptions on demand, and RxJava catches errors all at once, and sometimes the exceptions are just eaten up by RxJava, and you get exceptions in your program that shouldn’t be there and you don’t know about it, and sometimes it’s hard to find.
If an error is caught and a value is emitted, it is equivalent to RxJava’s onErrorReturn()
flow.catch{
emit(0)
}
Copy the code
The callback
In RxJava we often use some of the callback, flow? Some!
-
OnStart: Data stream starts transmitting
-
OnEach: Callback for each data transmission in the data stream
-
OnCompletion: Data stream ends transmission
-
OnEmpty: When no data is emitted in the data stream
-
Catch: When an exception occurs
These callbacks can be used to make some hints based on the service. For example, onStart will display a loading, onEmpty indicates that the requested data is empty, indicating that there is no data at present. Catch indicates that an error occurred in the request. OnCompletion indicates that the request is complete and loading is hidden.
Flow {emit(1) emit(2) emit(3) emit(4)}. Catch {e -> // An exception occurred. ShowToast (" loading error ")}.onempty {// Empty data showToast(" no data ")}.onstart {showToast(" loading error ")}.oneach { OnCompletion {e-> if(e == null){showToast(" loading ends $it")}else{showToast(" loading fails $e")}}.collect {showToast(" $it") println(it)}Copy the code
Back pressure
Definition of back pressure: Upstream transmits data too fast for downstream processing.
Upstream emits one data every 1s, downstream processes one data every 2s.
The same is true of saturation rejection policies for thread pools: RejectedExecutionHandler
Producers produce too quickly for consumers to process.
Flow inevitably fails to circumvent the restriction of back pressure, and the downstream cannot control the speed of upstream data transmission, so the downstream needs to make countermeasures.
Dealing with back pressure is nothing more than how to fetch it after caching.
Let’s recall RxJava’s back pressure policy: RxJava’s back pressure policy
- MISSING: the cache is full and then throw an exception MissingBackpressureException
- ERROR: an exception is thrown directly MissingBackpressureException
- BUFFER: infinitely enlarges the cache without losing any data.
- DROP: When the cache is full, new data is discarded
- LATEST: When the cache is full, the LATEST data is always added to the end of the cache
Flow’s strategy is particularly simple, using a single operator: buffer(capity, onBufferOver):
-
Capacity: indicates the cache size
-
OnBufferOverflow: Cache overflow policy.
-
SUSPEND: Suspends execution without discarding any data. The effect is equivalent to RxJava’s BUFFER, but does not occupy the cache due to the support of coroutines
-
DROP_OLDEST: Discard old data and process only new data. Equivalent to RxJava LATEST
-
DROP_LATEST: Drops new data. Equivalent to RxJava DROP
buffered values /-----------------------\ queued emitters /----------------------\ +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ | | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E | | | | +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ Copy the code
If buffer size = 4
DROP_OLDEST: [1, 2, 3, 4] < – [5, 6], [1, 2, 3] will be discarded into – > [4, 7] DROP_LATEST: [1,2,3,4] <- [5,6,7] [5.6.7] will be discarded as -> [1,2,3,4]
A series of operators related to back pressure are derived from buffer:
-
Conflate (): fetches only the latest data, equivalent to buffer(0, DROP_OLDEST)
-
CollectLatest () : similar to Conflate, but instead of overwriting old data with new data, each data is processed, except that the logic for processing the previous data is cancelled if the previous data is not processed.
-
MapLatest: collectLatest
-
FlatMapLatest: collectLatest
Comparing Flow and Rxjava shows that the strategy is exactly the same. It’s all based on caching. Flow just doesn’t throw an error. And the default SUSPEND does not run out of memory.
Public fun <T> Flow<T>. Buffer (capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow < T > {/ / cache capacity must be > = 0 / BUFFERED CONFLATED the require (capacity > = 0 | | capacity = = BUFFERED | | capacity = = CONFLATED) { "Buffer size should be non-negative, BUFFERED, or CONFLATED, } // CONFLATED and SUSPEND conflict require(capacity! = CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } // desugar CONFLATED capacity to (0, DROP_OLDEST) var capacity = capacity var onBufferOverflow = onBufferOverflow if (capacity == CONFLATED) { capacity = 0 onBufferOverflow = BufferOverflow.DROP_OLDEST } // create a flow return when (this) { is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow) else -> ChannelFlowOperatorImpl(this, capacity = capacity, OnBufferOverflow = onBufferOverflow)}} Flow {repeat(10000) {emit(it)}}. Buffer (capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST).collect { println("backpressureDemo :$it") } // log // backpressureDemo :0 // backpressureDemo :9990 // backpressureDemo :9991 // backpressureDemo :9992 // backpressureDemo :9993 // backpressureDemo :9994 // backpressureDemo :9995 // backpressureDemo :9996 // backpressureDemo :9997 // backpressureDemo Flow {repeat(10000) {emit(it)}}. Buffer (capacity = 10, onBufferOverflow = BufferOverflow.DROP_LATEST).collect { println("backpressureDemo :$it") } // log // backpressureDemo :0 // backpressureDemo :1 // backpressureDemo :2 // backpressureDemo :3 // backpressureDemo :4 // backpressureDemo :5 // backpressureDemo :6 // backpressureDemo :7 // backpressureDemo :8 // backpressureDemo :9 // backpressureDemo :10Copy the code
So is the flow API designed to be super simple in terms of back pressure?
By default we don’t do anything, thanks to the suspend operation of the coroutineBack pressureProcessing will not be a problem, will be in order to slowly execute. RxJava Flowable is not very convenient, RxJava Flowable is also strange trouble to write, forced to specify the back pressure strategy, – change the flow!
The operator
With the exception of flowOn, almost all RxJava operators have flow equivalents. Flow operator classification of these categories, each category in the source code has a separate KT file
There are a lot of operators, so I won’t go over them all, but I’ll start with a few of the most common ones in RxJava.
map
Map corresponding to RxJava
This is very simple, the transformation operator. Add data A -> B
flowOf(A).map{
B
}
Copy the code
FlatMapConcat (currently a preview, subject to change in future versions)
ConcatMap corresponding to RxJava
Smooth out the values in the flow and then iterate the emission. The emission is orderly and consistent with the sequence of production data
// launch [1,2,3,4,5], FlatMapConcat {List {List(it) {emit(it)}}. Collect {println(it) } // output: // 0,0,1,0,1,2,0,1,2,3,0,1,2,3,4Copy the code
You can see that the output above is completely ordered.
FlatMapMerge (currently a preview, subject to change in future releases)
Flatmap corresponding to RxJava
Similar to flatMapConcat, but with concurrent emission.
Launch (IO){flow {List(5) {emit(it)}}. FlatMapMerge {flow {List(it) {emit(it)}}.collect { Println (it)}} // Output: // 0,0,0,1, 1,2, 3Copy the code
It’s out of order, it has to be in IO to be out of order, but if you start a single threaded coroutine, it’s still sequential
The internal implementation of flatMapConcat and flatMapMerge is similar.
@FlowPreview public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> = map(transform).flattenConcat() @FlowPreview public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow { collect { value -> emitAll(value) } } @FlowPreview public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R>): Flow<R> = map(transform). Flattener merge (concurrency) FlowPreview public fun <T> Flow<Flow<T>>. A flattener that rolls along the coast Int = DEFAULT_CONCURRENCY): Flow<T> { require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" } return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency) }Copy the code
zip
Observavle.zip() for RxJava equivalent
flow {
delay(2000)
emit(2)
}.zip(flow {
delay(1000)
emit("2")
}){a,b->
println(a)
println(b)
a to b
}.collect {
println(it)
}
Copy the code
Wait for both flows to be launched, and then merge into one flow for unified launch
buffer
Corresponding to RxJava Observable.buffer(), but RxJava merges the cached data into a List and emits it. Flow is still emitted one by one.
Caching data, see # backpressure introduction above. Flow incorporates the handling of back pressure into the buffer operator
debounce
Anti-shake, equivalent to Observable. Debounce, takes the value of the last emission in L time.
Flow {repeat(5){emit(it)}}.debounce(200).collect {println(it)} // Output 4Copy the code
Sample (currently a preview version, subject to change in subsequent versions)
Sample of equivalent RxJava
Take the last data within a certain period of time periodically
val flow = flow { emit("A") delay(1500) emit("B") delay(500) emit("C") delay(250) emit("D") delay(2000) emit("E") } // Val result = flow.sample(1000). ToList () assertEquals(listOf("A", "C", "D"), result)Copy the code
The above example sends data A, B, C, D, etc. at intervals, and we divide these times by 1000 ms:
1000 2000 3000 4000 5000
|-----|-----|-----|------|-----|
A B C D E
Copy the code
You can visually see the data of the last launch of each 1000 ms. If the interval between D and E exceeds 1000 ms, the data will be interrupted when it reaches this point, regardless of whether the data after E is within 1000m.
At the second 1000ms, C was just at the demarcation point, and there was no official explanation for this special situation. According to the actual operation results, C was considered to be the last data transmitted at the second 1000ms.
Here, the official test case is written with the expected result [A, B, D] and the actual run result [A, C, D].
It feels like an official bug. This sample is still a preview version, so it is not recommended to use it.
Heat flow
Definitions of cold and heat:
-
Cold: The observer does not send data until it subscribes to the data source
-
Hot: The data source itself can emit data at will
RxJava, like Flow, is cold by default.
The PublishSubject in RxJava is the hot data source: RxBus is implemented with it. Coroutines use channels to transmit data between them, so channels are inherently hot data channels that simply send data.
RecicedChannel and BroadcastChannel (Preview)
Breaking the Kotlin Coroutine (9) – Channel
- RecicedChannel: One-to-one transmission. A data can be received by only one receiver
- BroadcastChannel: one – to – many transmission. The same data can be received by all receivers
ReceiveAsFlow or Consumeasure Flow can be used to convert a Channel into a Hot flow.
-
ReceiveAsFlow: You can consume flow repeatedly (you can call Collect many times for consuming data from the same receiveAsFlow)
-
ConsumeAsFlow: A flow can only be consumed once (you can only call collect once for the same consumeAsFlow)
fun receiveChannel() { val receiveContext = Executors.newSingleThreadExecutor().asCoroutineDispatcher() val producer = GlobalScope.produce { var count = 0 while (true) { send(“hello-count”) count++ delay(1000) } } val consumeAsFlow = producer.receiveAsFlow() GlobalScope.launch(receiveContext) { consumeAsFlow.collect { println(“Ait”) } }
GlobalScope.launch { consumeAsFlow.collect { println("B$it") } } } Copy the code
/ / output: Ahello-0 Ahello-1 Bhello-2 Ahello-3 Bhello-4 Ahello-5 Bhello-6 Ahello-7 Bhello-8 Ahello-9 Bhello-10 Ahello-11 Bhello-12 Ahello-13 Bhello-14 Ahello-15
As you can see, the results of RecicedChannel are mutually exclusive and the same element can only be consumed by one consumer.
RecicedChannel is suitable for one-to-one consumption scenarios, where a data is consumed only once.
With the help of the asFlow extension function, we can convert a BroadcastChannel into a hot flow
fun broadcastChannel() { val receiveContext = Executors.newSingleThreadExecutor().asCoroutineDispatcher() val producer = GlobalScope.broadcast<String> { var count = 0 while (true) { send("hello-$count") count++ delay(1000) } } val consumeAsFlow = producer.asFlow() GlobalScope.launch(receiveContext) { consumeAsFlow.collect { println("A$it") } } Globalscope. launch {consumeasflow. collect {println("B$it")}}} // output ahello-0 ahello-1 bhello-1 ahello-2 bhello-2 Ahello-3 Bhello-3 Ahello-4 Bhello-4 Ahello-5 Bhello-5 Ahello-6 Bhello-6 Ahello-7 Bhello-7 Ahello-8 Bhello-8Copy the code
You can see that the output of broadcast is one-to-many, and the same element is consumed by multiple consumers.
BroadcastChannel is suitable for one-to-many consumption scenarios, where a data is consumed only n times, n = the number of consumers.
Most BroadcastChannel related API is marked as ExperimentalCoroutinesApi, follow-up may also have to adjust.
ShareFlow
There is a type of Flow called ShareFlow that implements this hot-send data.
ShareFlow functions almost the same as BroadcastChannel, but the ShareFlow implementation does not use the Channel Api. In addition, it provides the return of flow directly, and has the following features:
-
A cleaner API, because the Channel API is not used internally
-
Supports the configuration of replay and buffer
-
Provides a read-only ShareFlow and a writable MutableShareFlow
-
SharedFlow unavailable is closed, while channels can be closed voluntarily
ShareFlow is similar to Channel **, ** Let’s write a simple flow version of EventBus to see how MutableSharedFlow is used:
Object FlowEventBus {// Use MutableSharedFlow as a channel for events private val _events = MutableSharedFlow<Event>() // private mutable shared flow val events = _events.asSharedFlow() // publicly exposed as read-only shared flow suspend fun produceEvent(event: Event) {_events.emit(Event) // Soon ends until all Subscribers receive it}} // To receive Event GlobalScope.launch(sendContext) { FlowEventBus. Events. Collect {println (it) name)}} / / send events GlobalScope. Launch (recieveContext) {repeat (20) {delay (2000). FlowEventBus.produceEvent(Event("a${it}")) } }Copy the code
MutableSharedFlow takes three parameters
-
Replay: Cache the amount of data that has been sent before, and send the value again if there is a new subscriber. The BehaviorSubject of RxJava is the same as the BehaviorSubject of RxJava, which can be used to implement sticky events
-
ExtraBufferCapacity: Indicates the number of caches
-
OnBufferOverflow: Policy after cache is full
ExtraBufferCapacity and onBufferOverflow are the same as the #buffer section above. Replay behaves like RxJava BehaviorSubject.
Therefore, the best solution to achieve hot flow should be ShareFlow.
Here is the flowbus I learned from shareFlow implementation: github.com/lwj1994/flo…
conclusion
Flow, as part of a coroutine, works seamlessly with coroutines and inherits the power of Reactive architecture. Simple and clear API, can completely replace RxJava.
reference
coroutines-Asynchronous Flow
Crack Kotlin coroutine (11) -flow
Cold flows, hot channels
StateFlow and SharedFlow