Kotlin Flow is basically an alternative to RxJava and provides a number of operators to process data. This article breaks them down and demonstrates their basic usage.
The deprecated methods will not be listed. Ctrl + F for quick lookup
The front data
- Cold flow 🥶
Cold means that the data is not produced when there are no consumers
- Heat flow 🥵
Heat refers to the production of data without observers
The Flow classification
-
The general Flow
In a normal Fow, there is only one observer. The cold flow.
{emit("hello") emit("flow")} // generate coroutinescope.launch {testflow.collect {value-> Println (value)}} // Print the Hello flowCopy the code
-
StateFlow
A stateful Flow, which can have multiple observers, is constructed with initial values passed in: initialState is often used as a ui-related observation of data, analogous to LiveData
{uiState. Collect {value-> println(value)}} var uiState=MutableStateFlow(result.loading) } // Assign uiState. Value = result. Sucess // Print the Result result. Loading result. SucessCopy the code
-
SharedFlow
Customizable StateFlow, can have multiple observers, heat flow. No initial value is required and three optional parameters are available: replay-Number of values to be replayed to new subscribers (cannot be negative, default is zero). ExtraBufferCapacity – The number of buffered values other than replay. The emit does not hang when there is free buffer space (optional, cannot be negative, default is zero). OnBufferOverflow – Configure the buffer overflow operation (optional, default to pause the attempt to issue value). Using SharedFlow you can write a FlowEventBus
Coroutinescope.launch {signevent.collect {value-> println(value)} TryEmit ("hello") signevent.tryemit ("shared flow") // Print the result hello shared flowCopy the code
The operator
- Intermediate operator
- Generally, it is used to perform some operation, which is not immediately executed, and the return value is still a
Flow
- Generally, it is used to perform some operation, which is not immediately executed, and the return value is still a
- End operator
- Will trigger execution of the stream. The return value is not
Flow
- Will trigger execution of the stream. The return value is not
Create a Flow
- flow
The basic way to create a Flow. Emit a single value using emitAll to emit a stream, like list.addall (anotherList)
Flow <Int>{emit(1) emit(2) emit(flowOf(1,2,3))}Copy the code
- flowOf
Quickly create flow, similar to listOf()
Val testFLow = flowOf(1,2,3) launch{testflow.collect {value-> print(value)}} // Print result 1 2 3Copy the code
- asFlow
Convert other data into a normal flow, typically a collection to flow
ListOf (1, 2, 3). AsFlow ()Copy the code
- callbackFlow
Change the callback method to flow, similar to suspendCoroutine
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: T) {
send(value)
.onFailure { throwable ->
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() = channel.close()
}
api.register(callback)
awaitClose { api.unregister(callback) }
}
Copy the code
- emptyFlow
Returns an empty stream.
emptyFlow<Int>()
Copy the code
- channelFlow
While normal Flow does not allow threads to be switched within the construction block, ChannelFlow allows threads to be switched internally
// Build val channelFlow = channelFlow<String> {send("hello") withContext(Dispatchers.IO) {send("channel flow")}} // Listen coroutineScope.launch{ signEvent.collect{ value-> println(value) } }Copy the code
End operator
- collect
Triggers flow to run. The usual way to listen
Launch {flowOf(1,2,3).collect{value-> print(value)}} // 1 2 3Copy the code
- collectIndexed
Collection operation with an underlabel
Launch {flowOf(1,2,3).collectIndexed{value-> print(value)}} // 1 2 3Copy the code
- collectLatest
The difference with collect is that when a new value is issued, the collection of the previous value is cancelled if the previous collection has not completed
flow { emit(1) delay(50) emit(2) } .collectLatest { value -> println("Collecting $value") delay(100) // Emulate work Println ("$value collected")} // Collect 1 collect 2 2 collectedCopy the code
You want only the most recent data, which is useful when the intermediate value can be discarded
- toCollection
Add the result to the collection
val array = arrayListOf(0) launch { flow { emit(1) emit(2) } .toCollection(array) } array.forEach { value-> print(value) } // Print the result 0, 1, 2Copy the code
- toList
Convert the result to a List
flow {
emit(1)
emit(2)
} .toList().forEach{value->
print(value)
}
// 1 2
Copy the code
- toSet
Convert the result to a Set
flow {
emit(1)
emit(1)
} .toSet().forEach{value->
print(value)
}
// 1
Copy the code
- launchIn
It triggers execution of the flow directly. No action is set. The incoming parameter is coroutineScope. It is usually not called directly, but is used in conjunction with other operators such as onEach and onCompletion. The return value is Job
flow {
emit(1)
emit(2)
}.launchIn ( lifecycleScope )
Copy the code
- last
Returns the last value emitted by the stream, throwing an exception if null
val myFlow= flow {
emit(1)
emit(2)
}
launch{
print(myFlow.last())
}
// 2
Copy the code
- lastOrNull
Returns the last value emitted by the stream, which can be null
val myFlow= emptyFlow<Int>()
launch{
print(myFlow.lastOrNull())
}
// null
Copy the code
- first
Returns the first value emitted by the stream, throwing an exception if null
val myFlow= flow {
emit(1)
emit(2)
}
launch{
print(myFlow.first())
}
// 1
Copy the code
- firstOrNull
Returns the first value emitted by the stream, which can be null
val myFlow= emptyFlow<Int>()
launch{
print(myFlow.firstOrNull())
}
// null
Copy the code
- single
The first value sent by the receive stream, as opposed to first(), which will report an error if it is null or if more than one value is sent
val myFlow= flow {
emit(1)
}
launch {
print(myFlow.single()) // 1
}
val myFlow1= flow {
emit(1)
emit(2)
}
launch {
print(myFlow 1 . single ()) // error
}
Copy the code
- singleOrNull
The first value sent by the receiving stream can be null, but if more than one value is sent, an error will still be reported
val myFlow= flow {
emit(1)
}
launch {
print(myFlow. singleOrNull ()) // 1
}
Copy the code
- count
Returns the number of values sent by the stream. Similar to list.size(), note: sharedFlow is invalid
val myFlow= flow {
emit(1)
emit(2)
}
launch{
print(myFlow.count())
}
//2
Copy the code
- fold
The traversal starts with the initial value and takes the result as an argument to the next execution.
Sum = flowOf(2, 3, 4).fold(1, {result, value -> result + value}) // sum= 10, = 1 + 2 + 3 + 4Copy the code
- reduce
It’s almost like a fold. No initial value
val result= flowOf(1, 2, 3)
.reduce { acc, value ->
acc + value
}
//result = 6 1 + 2 +3
Copy the code
Callback operator
- onStart
Is called before the upstream stream begins. You can emit extra elements, and you can also handle other things, such as sending burial points
flow<Result>{
emit(Result.Success)
}.onStart{
emit(Result.Loading)
}
Copy the code
- onCompletion
Called when the stream cancels or ends. You can do things like send elements, send buried points, etc
flow<Result>{
emit(Result.Success)
}.onCompletion{
emit(Result.End)
}
Copy the code
- onEach
Called before an element is emitted from upstream to downstream
Flow <Int>{emit(1) emit(2) emit(3)}.oneach {value-> println(value)}.launchin (lifecycleScopeCopy the code
- onEmpty
Callback when the stream completes without emitting any elements. Can be used to pocket.
EmptyFlow <String>().onempty {emit(" pocket data ")}.launchin (lifecycleScope)Copy the code
- onSubscription
The SharedFlow exclusive operator (StateFlow is a special implementation of SharedFlow) is called back after the subscription is established. Unlike onStart, SharedFlow is a heat flow, so if you send a value in onStart, it may not be received downstream.
val state = MutableSharedFlow<String>().onSubscription { emit("onSubscription") } launch{ state.collect { value-> Println (value)}} // Print result onSubscriptionCopy the code
Transformation operator
- map
The emitted value is transformed, and the return value of the lambda is the final sent value
Flow {emit(1) emit(2)}.map {value -> value * 2}.collect {println(value)} 2 4Copy the code
- mapLatest
Like collectLatest, when a new value is sent if the last transformation hasn’t finished, it’s canceled
flow { emit("a") delay(100) emit("b") }.mapLatest { value -> println("Started computing $value") delay(200) "Computed $value"}.collect {value-> print(value)} // Computed b Started Computing A Started computing b Computed BCopy the code
- mapNotNull
Only the value that is not empty after the map is sent
flow { emit("a") emit("b") } .mapNotNull { value -> if (value ! = null) {value} else {null}}.collect {value -> print(value)Copy the code
- transform
Transform the emitted value. Unlike map, the transform recipient is a FlowCollector, so it is very flexible and can be transformed, skipped, or sent multiple times.
flow { emit(1) emit(2) } .transform { value -> if (value == 1) { emit("value :$value*2") } emit("transform :$value") }.collect {value-> println(value)} // Print result value :1 *2 transform :1 transform :2Copy the code
- transformLatest
Like mapLatest, when a new value is sent if the last transformation hasn’t finished, it’s canceled
flow { emit("a") delay(100) emit("b") }.transformLatest { value -> emit(value) delay(200) emit(value + "_last") }.collect {value-> println(value)} // Print result a b b_lastCopy the code
- transformWhile
The return value of this altered lambda is Boolean; if False, no further transformation is performed; if True, execution continues
Flow {emit("a") emit("b")}.transformwhile {value-> emit(value) true}.collect {value-> println(value)} // Result a b -------------------- flow { emit("a") emit("b") }.transformWhile { value -> emit(value) false }.collect { value-> Println (value)} // Result aCopy the code
- asStateFlow
To convert MutableStateFlow to StateFlow is to become immutable. Often used when exposing properties
private val _uiState = MutableStateFlow<UIState>(Loading)
val uiState = _uiState.asStateFlow()
Copy the code
- asSharedFlow
To convert MutableSharedFlow to SharedFlow is to become immutable. Often used when exposing properties
private val _uiState = MutableStateFlow<UIState>(Loading)
val uiState = _uiState.asStateFlow()
Copy the code
- receiveAsFlow
Converting a Channel to a Flow can have multiple observers, but not multicast, and may receive values in turn.
private val _event = Channel<Event>()
val event= _event.receiveAsFlow()
Copy the code
- consumeAsFlow
Convert Channel to Flow, but not multiple observers (crash)!
private val _event = Channel<Event>()
val event= _event.consumeAsFlow ()
Copy the code
- withIndex
Wrap the result as type IndexedValue
Flow {emit("a") emit("b")}.withindex ().collect {print(it.index + ": "+ it.value)} // Result 0: a 1: bCopy the code
- scan
Similar to fold, except that a fold returns the final result, while a SCAN returns a flow that sends the initial value and the results of each step.
FlowOf (1, 2, 3).scan(0) {acc, value -> acc + value}.collect {print(it)} // 0 1 3 6 acc Value is the value that is emitted. 0 is the initial value. 1 is 0 + 1 = 1. 3 is 1 + 2 = 3Copy the code
- produceIn
Convert to ReceiveChannel, not commonly used. Note: A Channel is divided into ReceiveChannel and SendChannel.
flowOf(1, 2, 3).produceIn(this)
.consumeEach { value->
print(value)
}
//1 2 3
Copy the code
- runningFold
Unlike a fold, it returns a new stream that emits the results of each step.
flowOf(1, 2, 3).runningFold(1){ acc, value ->
acc + value
} .collect { value->
print(value)
}
// 1 2 4 7
Copy the code
- runningReduce
Unlike reduce, it returns a new stream and emits the results of each step.
flowOf(1, 2, 3).runningReduce(1) { acc, value ->
acc + value
} .collect { value->
print(value)
}
// 1 3 6
Copy the code
- shareIn
Scope: CoroutineScope CoroutineScope to start sharing. Started: SharingStarted controls when to start and stop sharing. Replay: Int = 0 The number of old values sent to new subscribers
Started: fifteen: shared starts immediately and never stops Lazily, when the first subscriber is present. Sharing starts when the first subscriber is present and stops immediately when the last subscriber disappears (by default). Keepingthe replay cache permanently (by default). Whilesubscribe has the following optional parameters: StopTimeoutMillis – Configures the delay in milliseconds between when the last subscriber disappears and when the coroutine stops sharing. The default is zero (stop now). ReplayExpirationMillis – The timeframe of a shared coroutine from shutdown to reactivation
Val share = flowOf (1, 2, 3). ShareIn (this, SharingStarted. Eagerly). / / there can be multiple state observer collect {value - > print (value)}Copy the code
- stateIn
Convert regular flow to StateFlow. It takes three parameters: scope – coroutine scope to start sharing started – policy to control when sharing starts and stops initialValue – initialValue of the status flow
val state = flowOf(Success).stateIn(lifecycleScope,SharingStarted.Eagerly,Loading)
state.collect{value->
print(value)
}
// Loading Success
Copy the code
StateIn and sharedIn are usually used for flow retrofit monitors from other sources and are not used in this way.
Filter operator
- filter
The value that meets the condition is selected
Flow {emit("a") emit("b")}.filter {value-> value == "a"}.collect {value-> print(value)} // Result aCopy the code
- filterInstance
Filter the value of the corresponding type
Flow {emit("a") emit("b") emit(1)}.filterisInstance <String>().collect {value-> print(value)} // Result a bCopy the code
- filterNot
The value that does not meet the conditions and is opposite is equivalent to the inverse filter
Flow {emit("a") emit("b")}.filterNot {it == "a"}.collect {value -> print(value)} // Result bCopy the code
- filterNotNull
Filters values that are not empty
Flow {emit("a") emit(null) emit("b")}.filterNotnull ().collect {value-> print(value)} // Result a bCopy the code
- drop
The input parameter count is of type int and is used to discard the first n values
Flow {emit(1) emit(2) emit(3)}.drop(2).collect {value -> print(value)} // Result 3Copy the code
- dropWhile
This operator is a little special. It’s different from filter! It finds the first one that doesn’t meet the condition and returns it and the values after it. If the first term does not satisfy the condition, it is all returned.
Flow {emit(3) emit(1) // Emit (2) emit(4)}.dropwhile {it == 3}.collect {value -> print(value)} // Result 1 2 4 flow {emit(1) // Emit (2) emit(3) emit(4)}.dropwhile {it == 3}.collect {value -> print(value)} // Result 1 2 3 4Copy the code
- take
Returns the first n elements
Flow {emit(1) emit(2) emit(3)}.take(2).collect {value -> print(value)} // Result 1 2Copy the code
- takeWhile
Again, find the first item that doesn’t satisfy the condition, but take the value before it, as opposed to dropWhile.
If the first term is not satisfied, it is an empty stream
Flow {emit(1) emit(2) emit(3) emit(4)}.takewhile {it <3}.collect {value -> print(value)} // Result 1 2 Flow {emit(3) // Emit (1) emit(2) emit(4)}.takewhile {it <3}.onempty {print("empty")}.collect {value -> print(value)Copy the code
- debounce
Anti-shake throttling. Only the latest value in a specified period of time is received and the others are filtered out. Search associative scenarios apply
flow {
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.debounce(1000)
// 3 4 5
Copy the code
- sample
Sampling. Given a time period, only the most recently issued values are retrieved within that period
Flow {repeat (10) {emit (it) delay (110)}}. The sample (200) / / 1 3 5 7 9 / / graphic 【 1 】 | -- -- -- -- -- -- -- -- -- -- - | 1 200 2 [3] | -- -- -- -- -- -- -- -- -- -- -- - | 200, 400,Copy the code
- distinctUntilChangedBy
The deduplicate operator, which determines whether two consecutive values are duplicate and can optionally discard duplicate values.
keySelector: (T) -> Any? Specifies the key used to determine whether a comparison is required
A bit like the Recyclerview’s DiffUtil mechanism.
flowOf(
Funny(name = "Tom", age = 8),
Funny(name = "Tom", age = 12),
Funny(name = "Tom", age = 12)
).distinctUntilChangedBy { it.name } .collect { value ->
print(value.toString())
}
// Funny(name=Tom, age=8)
Copy the code
- distinctUntilChanged
To filter, a simplified call to distinctUntilChangedBy. If two consecutive values are the same, send is skipped
FlowOf (1, 1, 3,1).distinctUntilChanged().collect {value -> print(value)} // 1 3 1Copy the code
Combination operator
- combine
Combines the most recently emitted values for each stream.
val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s } .collect {
println(it) // Will print "1a 2a 2b 2c"
}
Copy the code
- combineTransform
Combine + transform****
val numberFlow = flowOf(1, 2).onEach { delay(10) } val stringFlow = flowOf("a", "b", "c").onEach { delay(15) } numberFlow.combineTransform(stringFlow) { number, Emit ("$number :$string")}.collect {value -> println(value)} // Result 1 :a 2 :a 2 :b 2 :cCopy the code
- merge
Merge multiple streams into a single stream. Can be used for multilevel cache loading
val numberFlow = flowOf(1, 2).onEach { delay(10) }
val stringFlow = flowOf("a", "b", "c").onEach { delay(15) }
listOf(numberFlow,stringFlow).merge()
.collect { value ->
print(value)
}
// 1 a 2 b c
Copy the code
- flattenConcat
Expands a given Flow into a single Flow in a sequential manner, is an extension function of Flow
>
flow {
emit(flowOf(1, 2, 3))
emit(flowOf(4, 5, 6))
} .flattenConcat().collect { value->
print(value)
}
// 1 2 3 4 5 6
Copy the code
- flattenMerge
This is similar to the mathematical concat, but you can set the number of concurrent collection streams.
There is one entry: concurrency: Int. When the mathematical is == 1, the map has the same effect as a mathematical concat. When the mathematical is greater than 1, the mathematical is a concurrent collection.
flow { emit(flowOf(1, 2, 3).flowOn(Dispatchers.IO)) emit(flowOf(4, 5, 6).flowOn(Dispatchers.IO)) emit(flowOf(7, 8, 1).flowon (Dispatchers.IO))}. Mathematical merge (3).collect {value-> print(value)} //1 2 3 7 8 9 4 5 6Copy the code
- flatMapContact
This is a combination operator that acts as a map + flattener, along with a map that converts a stream on the map, along with a flattener
Expand and merge into a stream
flowOf(1, 2, 3).flatMapConcat {
flowOf(it.toString() + " map")
} .collect { value ->
print ln (value)
}
// 1 map
// 2 map
// 3 map
Copy the code
- flatMapLatest
As with the other Latest operators, if the next value comes and the transformation is not finished, cancel it.
Equivalent to transformLatest + emitAll
flow {
emit("a")
delay(100)
emit("b")
}.flatMapLatest { value ->
flow {
emit(value)
delay(200)
emit(value + "_last")
}
}.collect { value ->
print(value)
}
// a b b_last
Copy the code
- flatMapMerge
It’s also a combination operator for simplicity. The map + flattenMerge. That’s why we have concurrency: Int to limit concurrency.
flowOf("a","b","c","d","e","f").flatMapMerge(3) { value ->
flow {
emit(value)
} .flowOn(Dispatchers.IO)
}.collect { value ->
print(value)
}
// b a c d e f
Copy the code
- zip
You combine two streams, take values from both, and once one stream ends, the whole process is over.
val flow = flowOf(1, 2, 3).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
flow.zip(flow2) { i, s -> i.toString() + s }.collect {
println(it)
}
// Will print "1a 2b 3c"
Copy the code
Functional operator
- cancellable
When received, determine if the coroutine has been cancelled, and if so, raise an exception
Val job= flowOf(1,3,5,7).cancellable().oneach {value-> print(value)}.launchin (lifecycleScope) // cancel job.Copy the code
- catch
The upstream exceptions are captured, but the downstream exceptions are not affected
Upstream refers to the stream before this operator
Downstream refers to the stream after this operator
flow<Int> { throw IOException("") } .catch { e -> if(e is IOException){ //... }}Copy the code
- retryWhen
Conditionally retry. The lambda takes two arguments: one is the exception cause, and the other is the index of the current retry (starting from 0).
The return value of lambda is Boolean, true to continue retry,false to end retry
flow<Int> {
print("doing")
throw IOException("")
} .retryWhen { cause,attempt->
if(attempt > 4){
return@retryWhen false
}
cause is IOException
}
Copy the code
- retry
A retry mechanism that allows the stream to be reexecuted if an exception occurs. A simplified version of retryWhen.
Retries: ‘ ‘Long’ ‘= long. MAX_VALUE Specifies the number of retries and whether to retry (default: true).
flow<Int> {
throw IOException("")
}. retry (3){ e->
if(e is IOException){
true
}else {
false
}
}
flow<Int> {
throw IOException("")
}.retry(3)
Copy the code
- buffer
If the operator’s code takes a considerable amount of **** to execute, you can use the buffer operator to create a separate coroutine for it during execution
Capacity: Int = BUFFERED Buffer capacity
OnBufferOverflow: BufferOverflow = BufferOverflow. ‘ ‘SUSPEND ** Operations performed on overflow words
There are three options: SUSPEND, DROP_OLDEST throws out old, and DROP_LATEST throws out new
flowOf("A", "B", "C") .onEach { println("1$it") } .collect { println("2$it") } Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->-- flowOf("A", "B", "C") .onEach { println("1$it") } .buffer() // <--------------- buffer between onEach and collect .collect { println("2$it") } P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...) .onEach { ... } | | channel // buffer() V Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collectCopy the code
- conflate
Keep only the latest value, and the internal is buffer (‘ ‘CONFLATED’ ‘)
flow { repeat(30) { delay(100) emit(it) } }.conflate().onEach { delay(1000) } .collect { value -> print(value) } // 0 7 15, 22, 29 (results are not fixed)Copy the code
- flowOn
Specifies the thread of execution for the upstream operation. Use it if you want to switch execution threads!
flow.map { ... } // Will be executed in IO
. flowOn (Dispatchers.IO) // This one takes precedence
. collect{ ... }
Copy the code
conclusion
That’s the basic use of all of Kotlin Flow’s operators, used as needed in real-world scenarios. For example, debounce is used for search scenarios, Retry is used for network requests, SharedFlow is used for component communications, and combine is used for data merging. Improve development efficiency