Kotlin Flow is basically an alternative to RxJava and provides a number of operators to process data. This article breaks them down and demonstrates their basic usage.

The deprecated methods will not be listed. Ctrl + F for quick lookup

The front data

  • Cold flow 🥶

Cold means that the data is not produced when there are no consumers

  • Heat flow 🥵

Heat refers to the production of data without observers

The Flow classification

  • The general Flow

In a normal Fow, there is only one observer. The cold flow.

{emit("hello") emit("flow")} // generate coroutinescope.launch {testflow.collect {value-> Println (value)}} // Print the Hello flowCopy the code
  • StateFlow

A stateful Flow, which can have multiple observers, is constructed with initial values passed in: initialState is often used as a ui-related observation of data, analogous to LiveData

{uiState. Collect {value-> println(value)}} var uiState=MutableStateFlow(result.loading) } // Assign uiState. Value = result. Sucess // Print the Result result. Loading result. SucessCopy the code
  • SharedFlow

Customizable StateFlow, can have multiple observers, heat flow. No initial value is required and three optional parameters are available: replay-Number of values to be replayed to new subscribers (cannot be negative, default is zero). ExtraBufferCapacity – The number of buffered values other than replay. The emit does not hang when there is free buffer space (optional, cannot be negative, default is zero). OnBufferOverflow – Configure the buffer overflow operation (optional, default to pause the attempt to issue value). Using SharedFlow you can write a FlowEventBus

Coroutinescope.launch {signevent.collect {value-> println(value)} TryEmit ("hello") signevent.tryemit ("shared flow") // Print the result hello shared flowCopy the code

The operator

  • Intermediate operator
    • Generally, it is used to perform some operation, which is not immediately executed, and the return value is still aFlow
  • End operator
    • Will trigger execution of the stream. The return value is notFlow

Create a Flow

  • flow

The basic way to create a Flow. Emit a single value using emitAll to emit a stream, like list.addall (anotherList)

Flow <Int>{emit(1) emit(2) emit(flowOf(1,2,3))}Copy the code
  • flowOf

Quickly create flow, similar to listOf()

Val testFLow = flowOf(1,2,3) launch{testflow.collect {value-> print(value)}} // Print result 1 2 3Copy the code
  • asFlow

Convert other data into a normal flow, typically a collection to flow

ListOf (1, 2, 3). AsFlow ()Copy the code
  • callbackFlow

Change the callback method to flow, similar to suspendCoroutine

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            send(value)
                .onFailure { throwable ->
                
                }

        }

        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }

        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    awaitClose { api.unregister(callback) }
}
Copy the code
  • emptyFlow

Returns an empty stream.

emptyFlow<Int>()
Copy the code
  • channelFlow

While normal Flow does not allow threads to be switched within the construction block, ChannelFlow allows threads to be switched internally

// Build val channelFlow = channelFlow<String> {send("hello") withContext(Dispatchers.IO) {send("channel flow")}} // Listen coroutineScope.launch{ signEvent.collect{ value-> println(value) } }Copy the code

End operator

  • collect

Triggers flow to run. The usual way to listen

Launch {flowOf(1,2,3).collect{value-> print(value)}} // 1 2 3Copy the code
  • collectIndexed

Collection operation with an underlabel

Launch {flowOf(1,2,3).collectIndexed{value-> print(value)}} // 1 2 3Copy the code
  • collectLatest

The difference with collect is that when a new value is issued, the collection of the previous value is cancelled if the previous collection has not completed

flow { emit(1) delay(50) emit(2) } .collectLatest { value -> println("Collecting $value") delay(100) // Emulate work Println ("$value collected")} // Collect 1 collect 2 2 collectedCopy the code

You want only the most recent data, which is useful when the intermediate value can be discarded

  • toCollection

Add the result to the collection

val array = arrayListOf(0) launch { flow { emit(1) emit(2) } .toCollection(array) } array.forEach { value-> print(value) } // Print the result 0, 1, 2Copy the code
  • toList

Convert the result to a List

flow {
   emit(1)
   emit(2)
} .toList().forEach{value->
    print(value)
}
// 1 2
Copy the code
  • toSet

Convert the result to a Set

flow {
   emit(1)
   emit(1)
} .toSet().forEach{value->
    print(value)
}
// 1
Copy the code
  • launchIn

It triggers execution of the flow directly. No action is set. The incoming parameter is coroutineScope. It is usually not called directly, but is used in conjunction with other operators such as onEach and onCompletion. The return value is Job

flow {
   emit(1)
   emit(2)
 }.launchIn ( lifecycleScope )
Copy the code
  • last

Returns the last value emitted by the stream, throwing an exception if null

val myFlow= flow {
   emit(1)
   emit(2)
 }

launch{
    print(myFlow.last())
}

// 2
Copy the code
  • lastOrNull

Returns the last value emitted by the stream, which can be null

val myFlow= emptyFlow<Int>()
launch{
    print(myFlow.lastOrNull())
}

// null
Copy the code
  • first

Returns the first value emitted by the stream, throwing an exception if null

val myFlow= flow {
   emit(1)
   emit(2)
 }
launch{
    print(myFlow.first())
}

// 1
Copy the code
  • firstOrNull

Returns the first value emitted by the stream, which can be null

val myFlow= emptyFlow<Int>()
launch{
    print(myFlow.firstOrNull())
}
// null
Copy the code
  • single

The first value sent by the receive stream, as opposed to first(), which will report an error if it is null or if more than one value is sent

val myFlow= flow {
     emit(1)
}

launch {
     print(myFlow.single()) // 1
}

val myFlow1= flow {
   emit(1)
   emit(2)
 }

launch {
   print(myFlow 1 . single ()) // error
}
Copy the code
  • singleOrNull

The first value sent by the receiving stream can be null, but if more than one value is sent, an error will still be reported

val myFlow= flow {
 emit(1)
}

launch  {
 print(myFlow. singleOrNull ()) // 1
}
Copy the code
  • count

Returns the number of values sent by the stream. Similar to list.size(), note: sharedFlow is invalid

val myFlow= flow {
   emit(1)
   emit(2)
}
launch{
    print(myFlow.count())
}
//2
Copy the code
  • fold

The traversal starts with the initial value and takes the result as an argument to the next execution.

Sum = flowOf(2, 3, 4).fold(1, {result, value -> result + value}) // sum= 10, = 1 + 2 + 3 + 4Copy the code
  • reduce

It’s almost like a fold. No initial value

val result= flowOf(1, 2, 3)
                .reduce { acc, value ->
                     acc + value
                }
 //result = 6   1 + 2  +3 
Copy the code

Callback operator

  • onStart

Is called before the upstream stream begins. You can emit extra elements, and you can also handle other things, such as sending burial points

flow<Result>{
   emit(Result.Success)
}.onStart{
   emit(Result.Loading)
}
Copy the code
  • onCompletion

Called when the stream cancels or ends. You can do things like send elements, send buried points, etc

flow<Result>{
   emit(Result.Success)
}.onCompletion{
   emit(Result.End)
}
Copy the code
  • onEach

Called before an element is emitted from upstream to downstream

Flow <Int>{emit(1) emit(2) emit(3)}.oneach {value-> println(value)}.launchin (lifecycleScopeCopy the code
  • onEmpty

Callback when the stream completes without emitting any elements. Can be used to pocket.

EmptyFlow <String>().onempty {emit(" pocket data ")}.launchin (lifecycleScope)Copy the code
  • onSubscription

The SharedFlow exclusive operator (StateFlow is a special implementation of SharedFlow) is called back after the subscription is established. Unlike onStart, SharedFlow is a heat flow, so if you send a value in onStart, it may not be received downstream.

val state = MutableSharedFlow<String>().onSubscription { emit("onSubscription") } launch{ state.collect { value-> Println (value)}} // Print result onSubscriptionCopy the code

Transformation operator

  • map

The emitted value is transformed, and the return value of the lambda is the final sent value

Flow {emit(1) emit(2)}.map {value -> value * 2}.collect {println(value)} 2 4Copy the code
  • mapLatest

Like collectLatest, when a new value is sent if the last transformation hasn’t finished, it’s canceled

flow { emit("a") delay(100) emit("b") }.mapLatest { value -> println("Started computing $value") delay(200) "Computed $value"}.collect {value-> print(value)} // Computed b Started Computing A Started computing b Computed BCopy the code
  • mapNotNull

Only the value that is not empty after the map is sent

flow { emit("a") emit("b") } .mapNotNull { value -> if (value ! = null) {value} else {null}}.collect {value -> print(value)Copy the code
  • transform

Transform the emitted value. Unlike map, the transform recipient is a FlowCollector, so it is very flexible and can be transformed, skipped, or sent multiple times.

flow { emit(1) emit(2) } .transform { value -> if (value == 1) { emit("value :$value*2") } emit("transform :$value") }.collect {value-> println(value)} // Print result value :1 *2 transform :1 transform :2Copy the code
  • transformLatest

Like mapLatest, when a new value is sent if the last transformation hasn’t finished, it’s canceled

flow { emit("a") delay(100) emit("b") }.transformLatest { value -> emit(value) delay(200) emit(value + "_last") }.collect {value-> println(value)} // Print result a b b_lastCopy the code
  • transformWhile

The return value of this altered lambda is Boolean; if False, no further transformation is performed; if True, execution continues

Flow {emit("a") emit("b")}.transformwhile {value-> emit(value) true}.collect {value-> println(value)} // Result a b -------------------- flow { emit("a") emit("b") }.transformWhile { value -> emit(value) false }.collect { value-> Println (value)} // Result aCopy the code
  • asStateFlow

To convert MutableStateFlow to StateFlow is to become immutable. Often used when exposing properties

private val _uiState = MutableStateFlow<UIState>(Loading)

val uiState = _uiState.asStateFlow()
Copy the code
  • asSharedFlow

To convert MutableSharedFlow to SharedFlow is to become immutable. Often used when exposing properties

private val _uiState = MutableStateFlow<UIState>(Loading)

val uiState = _uiState.asStateFlow()
Copy the code
  • receiveAsFlow

Converting a Channel to a Flow can have multiple observers, but not multicast, and may receive values in turn.

private val _event = Channel<Event>()

val event= _event.receiveAsFlow() 
Copy the code
  • consumeAsFlow

Convert Channel to Flow, but not multiple observers (crash)!

private val _event = Channel<Event>()

val event= _event.consumeAsFlow () 
Copy the code
  • withIndex

Wrap the result as type IndexedValue

Flow {emit("a") emit("b")}.withindex ().collect {print(it.index + ": "+ it.value)} // Result 0: a 1: bCopy the code
  • scan

Similar to fold, except that a fold returns the final result, while a SCAN returns a flow that sends the initial value and the results of each step.

FlowOf (1, 2, 3).scan(0) {acc, value -> acc + value}.collect {print(it)} // 0 1 3 6 acc Value is the value that is emitted. 0 is the initial value. 1 is 0 + 1 = 1. 3 is 1 + 2 = 3Copy the code
  • produceIn

Convert to ReceiveChannel, not commonly used. Note: A Channel is divided into ReceiveChannel and SendChannel.

flowOf(1, 2, 3).produceIn(this)
               .consumeEach { value->
                    print(value)
               }
               
//1 2 3
Copy the code
  • runningFold

Unlike a fold, it returns a new stream that emits the results of each step.

flowOf(1, 2, 3).runningFold(1){ acc, value ->
     acc + value
 } .collect { value->
     print(value)
 }
 // 1 2 4 7
Copy the code
  • runningReduce

Unlike reduce, it returns a new stream and emits the results of each step.

flowOf(1, 2, 3).runningReduce(1) { acc, value ->
     acc + value
 } .collect { value->
     print(value)
 }
 // 1 3 6
Copy the code
  • shareIn

Scope: CoroutineScope CoroutineScope to start sharing. Started: SharingStarted controls when to start and stop sharing. Replay: Int = 0 The number of old values sent to new subscribers

Started: fifteen: shared starts immediately and never stops Lazily, when the first subscriber is present. Sharing starts when the first subscriber is present and stops immediately when the last subscriber disappears (by default). Keepingthe replay cache permanently (by default). Whilesubscribe has the following optional parameters: StopTimeoutMillis – Configures the delay in milliseconds between when the last subscriber disappears and when the coroutine stops sharing. The default is zero (stop now). ReplayExpirationMillis – The timeframe of a shared coroutine from shutdown to reactivation

Val share = flowOf (1, 2, 3). ShareIn (this, SharingStarted. Eagerly). / / there can be multiple state observer collect {value - > print (value)}Copy the code
  • stateIn

Convert regular flow to StateFlow. It takes three parameters: scope – coroutine scope to start sharing started – policy to control when sharing starts and stops initialValue – initialValue of the status flow

val  state = flowOf(Success).stateIn(lifecycleScope,SharingStarted.Eagerly,Loading)


state.collect{value->
  print(value)
}
// Loading  Success
Copy the code

StateIn and sharedIn are usually used for flow retrofit monitors from other sources and are not used in this way.

Filter operator

  • filter

The value that meets the condition is selected

Flow {emit("a") emit("b")}.filter {value-> value == "a"}.collect {value-> print(value)} // Result aCopy the code
  • filterInstance

Filter the value of the corresponding type

Flow {emit("a") emit("b") emit(1)}.filterisInstance <String>().collect {value-> print(value)} // Result a bCopy the code
  • filterNot

The value that does not meet the conditions and is opposite is equivalent to the inverse filter

Flow {emit("a") emit("b")}.filterNot {it == "a"}.collect {value -> print(value)} // Result bCopy the code
  • filterNotNull

Filters values that are not empty

Flow {emit("a") emit(null) emit("b")}.filterNotnull ().collect {value-> print(value)} // Result a bCopy the code
  • drop

The input parameter count is of type int and is used to discard the first n values

Flow {emit(1) emit(2) emit(3)}.drop(2).collect {value -> print(value)} // Result 3Copy the code
  • dropWhile

This operator is a little special. It’s different from filter! It finds the first one that doesn’t meet the condition and returns it and the values after it. If the first term does not satisfy the condition, it is all returned.

Flow {emit(3) emit(1) // Emit (2) emit(4)}.dropwhile {it == 3}.collect {value -> print(value)} // Result 1 2 4 flow {emit(1) // Emit (2) emit(3) emit(4)}.dropwhile {it == 3}.collect {value -> print(value)} // Result 1 2 3 4Copy the code
  • take

Returns the first n elements

Flow {emit(1) emit(2) emit(3)}.take(2).collect {value -> print(value)} // Result 1 2Copy the code
  • takeWhile

Again, find the first item that doesn’t satisfy the condition, but take the value before it, as opposed to dropWhile.

If the first term is not satisfied, it is an empty stream

Flow {emit(1) emit(2) emit(3) emit(4)}.takewhile {it <3}.collect {value -> print(value)} // Result 1 2 Flow {emit(3) // Emit (1) emit(2) emit(4)}.takewhile {it <3}.onempty {print("empty")}.collect {value -> print(value)Copy the code
  • debounce

Anti-shake throttling. Only the latest value in a specified period of time is received and the others are filtered out. Search associative scenarios apply

flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.debounce(1000)

 // 3 4 5
Copy the code
  • sample

Sampling. Given a time period, only the most recently issued values are retrieved within that period

Flow {repeat (10) {emit (it) delay (110)}}. The sample (200) / / 1 3 5 7 9 / / graphic 【 1 】 | -- -- -- -- -- -- -- -- -- -- - | 1 200 2 [3] | -- -- -- -- -- -- -- -- -- -- -- - | 200, 400,Copy the code
  • distinctUntilChangedBy

The deduplicate operator, which determines whether two consecutive values are duplicate and can optionally discard duplicate values.

keySelector: (T) -> Any? Specifies the key used to determine whether a comparison is required

A bit like the Recyclerview’s DiffUtil mechanism.

flowOf(
    Funny(name = "Tom", age = 8),
    Funny(name = "Tom", age = 12),
    Funny(name = "Tom", age = 12)
).distinctUntilChangedBy { it.name } .collect { value ->
     print(value.toString())
}

// Funny(name=Tom, age=8)
Copy the code
  • distinctUntilChanged

To filter, a simplified call to distinctUntilChangedBy. If two consecutive values are the same, send is skipped

FlowOf (1, 1, 3,1).distinctUntilChanged().collect {value -> print(value)} // 1 3 1Copy the code

Combination operator

  • combine

Combines the most recently emitted values for each stream.

val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s } .collect {
  println(it) // Will print "1a 2a 2b 2c"
}
Copy the code
  • combineTransform

Combine + transform****

val numberFlow = flowOf(1, 2).onEach { delay(10) } val stringFlow = flowOf("a", "b", "c").onEach { delay(15) } numberFlow.combineTransform(stringFlow) { number, Emit ("$number :$string")}.collect {value -> println(value)} // Result 1 :a 2 :a 2 :b 2 :cCopy the code
  • merge

Merge multiple streams into a single stream. Can be used for multilevel cache loading

val numberFlow = flowOf(1, 2).onEach { delay(10) }
val stringFlow = flowOf("a", "b", "c").onEach { delay(15) }

listOf(numberFlow,stringFlow).merge()
                             .collect { value ->
                                 print(value)
                             }


// 1 a 2 b c
Copy the code
  • flattenConcat

Expands a given Flow into a single Flow in a sequential manner, is an extension function of Flow

>

flow {
    emit(flowOf(1, 2, 3))
    emit(flowOf(4, 5, 6))
 } .flattenConcat().collect { value->
     print(value)
 }

// 1 2 3 4 5 6
Copy the code
  • flattenMerge

This is similar to the mathematical concat, but you can set the number of concurrent collection streams.

There is one entry: concurrency: Int. When the mathematical is == 1, the map has the same effect as a mathematical concat. When the mathematical is greater than 1, the mathematical is a concurrent collection.

flow { emit(flowOf(1, 2, 3).flowOn(Dispatchers.IO)) emit(flowOf(4, 5, 6).flowOn(Dispatchers.IO)) emit(flowOf(7, 8, 1).flowon (Dispatchers.IO))}. Mathematical merge (3).collect {value-> print(value)} //1 2 3 7 8 9 4 5 6Copy the code
  • flatMapContact

This is a combination operator that acts as a map + flattener, along with a map that converts a stream on the map, along with a flattener

Expand and merge into a stream

flowOf(1, 2, 3).flatMapConcat {
     flowOf(it.toString() + " map")
 } .collect { value ->
     print ln (value)
 }

// 1 map 
// 2 map
// 3 map
Copy the code
  • flatMapLatest

As with the other Latest operators, if the next value comes and the transformation is not finished, cancel it.

Equivalent to transformLatest + emitAll

flow {
     emit("a")
     delay(100)
     emit("b")
 }.flatMapLatest { value ->
     flow {
         emit(value)
         delay(200)
         emit(value + "_last")
     }
 }.collect { value ->
     print(value)
 }

 // a b b_last
Copy the code
  • flatMapMerge

It’s also a combination operator for simplicity. The map + flattenMerge. That’s why we have concurrency: Int to limit concurrency.

flowOf("a","b","c","d","e","f").flatMapMerge(3) { value ->
     flow {
         emit(value)
     } .flowOn(Dispatchers.IO)
 }.collect { value ->
     print(value)
 }

// b a c d e f
Copy the code
  • zip

You combine two streams, take values from both, and once one stream ends, the whole process is over.

val flow = flowOf(1, 2, 3).onEach { delay(10) }

val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }

flow.zip(flow2) { i, s -> i.toString() + s }.collect {
    println(it) 
}

 // Will print "1a 2b 3c"
Copy the code

Functional operator

  • cancellable

When received, determine if the coroutine has been cancelled, and if so, raise an exception

Val job= flowOf(1,3,5,7).cancellable().oneach {value-> print(value)}.launchin (lifecycleScope) // cancel job.Copy the code
  • catch

The upstream exceptions are captured, but the downstream exceptions are not affected

Upstream refers to the stream before this operator

Downstream refers to the stream after this operator

flow<Int> { throw IOException("") } .catch { e -> if(e is IOException){ //... }}Copy the code
  • retryWhen

Conditionally retry. The lambda takes two arguments: one is the exception cause, and the other is the index of the current retry (starting from 0).

The return value of lambda is Boolean, true to continue retry,false to end retry

flow<Int> {
    print("doing")
    throw IOException("")
 } .retryWhen { cause,attempt->
     if(attempt > 4){
        return@retryWhen false
     }
    cause is IOException
 }
Copy the code
  • retry

A retry mechanism that allows the stream to be reexecuted if an exception occurs. A simplified version of retryWhen.

Retries: ‘ ‘Long’ ‘= long. MAX_VALUE Specifies the number of retries and whether to retry (default: true).

flow<Int> {
  throw IOException("")     
 }. retry (3){ e->
  if(e is IOException){
      true
  }else {
      false
  }
}



flow<Int> {
  throw IOException("")     
 }.retry(3)
Copy the code
  • buffer

If the operator’s code takes a considerable amount of **** to execute, you can use the buffer operator to create a separate coroutine for it during execution

Capacity: Int = BUFFERED Buffer capacity

OnBufferOverflow: BufferOverflow = BufferOverflow. ‘ ‘SUSPEND ** Operations performed on overflow words

There are three options: SUSPEND, DROP_OLDEST throws out old, and DROP_LATEST throws out new

flowOf("A", "B", "C") .onEach { println("1$it") } .collect { println("2$it") } Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->-- flowOf("A", "B", "C") .onEach { println("1$it") } .buffer() // <--------------- buffer between onEach and collect .collect { println("2$it") } P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...) .onEach { ... } | | channel // buffer() V Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collectCopy the code
  • conflate

Keep only the latest value, and the internal is buffer (‘ ‘CONFLATED’ ‘)

flow { repeat(30) { delay(100) emit(it) } }.conflate().onEach { delay(1000) } .collect { value -> print(value) } // 0 7 15, 22, 29 (results are not fixed)Copy the code
  • flowOn

Specifies the thread of execution for the upstream operation. Use it if you want to switch execution threads!

flow.map { ... } // Will be executed in IO
 . flowOn (Dispatchers.IO) // This one takes precedence
 . collect{ ... }
Copy the code

conclusion

That’s the basic use of all of Kotlin Flow’s operators, used as needed in real-world scenarios. For example, debounce is used for search scenarios, Retry is used for network requests, SharedFlow is used for component communications, and combine is used for data merging. Improve development efficiency