Collections, Sequences, Channels, and Flows

Channel

Introduction to the

Channel is a very similar concept to BlockingQueue. One difference is that it replaces the blocking PUT operation and provides pending send, and it replaces the blocking take operation and provides pending receive. A Channel is concurrency secure and can be used to connect coroutines and communicate between different coroutines.

Simple to use

val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
    var i = 0
    while (true) {
        channel.send(i++)
        delay(1000)
    }
}

//consumer
launch {
    while (true) {
        println(channel.receive())
    }
}
Copy the code

Since a Channel is actually a queue, shouldn’t the queue have a buffer? Send hangs when the buffer is full and no one ever calls Receive to fetch the element. So let’s look at the definition of a Channel buffer:

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        else -> ArrayChannel(capacity)
    }
Copy the code
  • RENDEZVOUS is zero, and that word is meant to describe ‘be there or be square,’ so until you receive it, I’ll send it here and wait. In other words, if the consumer does not receive, the first send in the producer will be suspended.

  • UNLIMITED is easy to understand, given that it implements LinkedListChannel, which is similar to our LinkedBlockingQueue.

  • CONFLATED, which means to merge, this type of Channel has a buffer the size of an element, but every time a new element comes, it replaces the old one.

  • BUFFERED, it receives a value as the size of the buffer capacity, 64 by default.

But a Channel is a heat flow, and its production operations will be performed even if there are no consumers. If you don’t receive it, you probably won’t receive it again. Since a channel is like a BlockingQueue, its send() and receive() are also in-queue and out-queue operations, assuming there are multiple consumers and they will compete:

val channel = Channel<Int>() //producer launch(Dispatchers.IO) { var i = 0 while (true) { channel.send(i++) delay(1000) } } //consumer 1 launch { while (true) { println("~~~"+channel.receive()) } } //consumer 2 launch { while (true) { println("!!!" +channel.receive())}} Partial output: ~~~0 ~~1!! 2 ~ ~ ~!!!!!! 4 ~ ~ ~ 5!!!!!! 6 ~ 7 ~ ~!!!!!! 8 ~ ~ ~ 9Copy the code

Discovery is basically getting values alternately. What if you want to receive all of them: use BroadcastChannel:

val channel = BroadcastChannel<Int>(Channel.BUFFERED) //producer launch(Dispatchers.IO) { var i = 0 while (true) { channel.send(i++) delay(1000) } } //consumer 1 launch { while (true) { println("~~~"+channel.openSubscription().receive()) } } //consumer 2 launch { while (true) { println("!!!" +channel.openSubscription().receive())}} Partial output: ~~~1!! 1 ~ ~ ~ 2!!!!!! 2 ~ ~ ~!!!!!! 3 4 ~ ~ ~!!!!!! 4Copy the code

It is also important to note that channels need to be closed manually.

Channel version of the sequence generator

While sequence does not enjoy the capabilities of the higher-level coroutine framework concept, it is also clear that sequence is not thread-safe, while Channel can be used in concurrent scenarios.

launch {
    val channel = produce(Dispatchers.Unconfined) {
        send(1)
        send(2)
    }

    for (item in channel) {
        println("got : $item")
    }
}
Copy the code

However, even if no one “consumes” a Channel, its value will still be produced, which will cause certain waste. So what about Sequence + Channel?

Flow

Introduction to the

Flow is a new set of apis added after Kotlin Coroutines 1.2.0 Alpha. Also called Asynchronous Flows, Flow is the product of Kotlin Coroutines combined with the responsive programming model.

What is responsive programming

Reactive programming, based on the observer pattern, is a declarative programming approach oriented to data flow and change propagation. Put another way: responsive programming is programming with asynchronous data streams. [Responsive Programming]

What does Flow solve

Asynchronous suspend functions can return a single value, so how do we return multiple asynchronously computed values? This is the problem that Kotlin Flow addresses.

And comparison of the Channel

  • Flow is “cold” 🥶. Like Sequence, it is executed only when the end operation is encountered, but not the same as ↓
  • Flow is responsive, called back by the producer to the consumer (sequence is the consumer notifying the producer)
  • It is built on coroutines and thus provides all the benefits of structured concurrency and cancellation.
  • Rich operators

The channel [operator] is deprecated in Kotlin 1.4 and will be removed in the future

How to use

There are several ways to build a Flow. Here is the simplest:

Viewmodelscope.launch {// build flow val testFlow= flow {emit(1)} // Consume flow testflow.collect {println(it)}}Copy the code

Why is it cold

When a Flow is created, no consumption means no production, and multiple consumption means multiple production. Production and consumption always correspond. A cold stream is a stream that is produced only when consumed, as opposed to a Channel: the sender of a Channel does not depend on the receiver.

A collector is a collection of stream interfaces with a single suspend function. It is the terminal operator:

public interface Flow<out T> {    
    public suspend fun collect(collector: FlowCollector<T>)
}
Copy the code

The emitter is a FlowCollector with a single suspend function called emit

public interface FlowCollector<in T> {  
    public suspend fun emit(value: T)
}
Copy the code

Inside, the whole mechanism for the collector and the emitter is just calling functions on both sides,

The suspend keyword adds magic to it.

thread

  • Use flowOn() to switch the thread of execution of the flow, and the scheduler specified by flowOn affects the previous logic.
fun main(a) = runBlocking {
   flow {
        emit("Context")
        println(" emit on ${Thread.currentThread().name}")
    }
    .flowOn(Dispatchers.IO)
    .map {
        println(" map on ${Thread.currentThread().name}")
        it + " Preservation"
     }
     .flowOn(Dispatchers.Default)
     .collect { value ->
        println(" collect on ${Thread.currentThread().name}")
        println(value)
      }
}
Copy the code

Output:

 emit on DefaultDispatcher-worker-2
 map on DefaultDispatcher-worker-1
 collect on main
 Context Preservation
Copy the code

Exception handling

Flows never catch or handle exceptions that occur in the downstream ⬇️ stream, they only catch exceptions that occur upstream ⬆️ using the catch operator.

flow {
  emit(1)
  throw ArithmeticException("Div 0")}.catch { t: Throwable ->
  println("caught error: $t")}Copy the code

Beginning and end

If you want to do something at the beginning and end of the stream. OnCompletion is similar to try… catch … Finally in finally, which is called regardless of whether an exception exists earlier, and the t argument is the exception not caught earlier.

flow {
    emit(1)
}.onStart {
    println("smart")
}.onCompletion {  t: Throwable ->
  println("caught error: $t")
}.collect {
    println(it)
}
Copy the code

Output:

smart
1
end
Copy the code

Flow is designed to ensure that exceptions are transparent during Flow operations. Therefore prohibit 🚫 from trying catch in flow builds:

Wrong:

flow { 
  try {
    emit(1)
    throw ArithmeticException("Div 0")}catch (t: Throwable){
    println("caught error: $t")}finally {
    println("finally.")}}Copy the code

End operator

In the previous example, we used data from the Collect consumption Flow. Collect is the most basic end operator. In addition to collect, there are other common end operators, which can be divided into three types:

  1. Collection type conversion operations, includingtoList,toSetAnd so on.
  2. Aggregation operations, including specifying Flow to single valuesreduce,foldAnd to get a single element includesingle,singleOrNull,firstAnd so on.
  3. No operationcollect()launchIn()And so on.

In fact, there’s an easy way to tell if it’s an end operator. Because the consumer side of a Flow must be running inside a coroutine, end operators are suspended functions.

The cancellation of the Flow

Flow does not provide cancellation operations. The consumption of a Flow depends on end-operators such as collect, which must be called inside a coroutine, so the cancellation of a Flow depends mainly on the state of the coroutine in which the end-operator is located.

val job = launch {
    val intFlow = flow {
        (1.3.).forEach {
            delay(1000)
            emit(it)
        }
    }

    intFlow.collect { println(it) }
}

delay(2500)
job.cancel()
Copy the code

Other Flow creation methods

flow { … } is the basic way to create streams, and there are other builders that make stream declarations easier:

  • flowOfThe builder defines a stream that emits a fixed set.
  • use.asFlow()Extension functions that convert various collections and sequences into streams.

In the flow {… } can’t switch the scheduler at will because the emit function is not thread-safe:

flow {
    withContext(Dispatchers.IO){  //error
        emit(2)
    }
    emit(1)
}.collect {
    println(it)
}
Copy the code

To switch the scheduler while the element is being generated, usechannelFlowFunction to createFlow:

channelFlow {
  send(1)
  withContext(Dispatchers.IO) {
    send(2)}}Copy the code

SharedFlow

What if I want a “hot” flow? **SharedFlow** is to solve this problem. SharedFlow is usually implemented with BroadcastChannel and then asFlow, but this implementation is not elegant and is too coupled to the Channel. Hence The introduction of SharedFlow in Coroutine 1.4. It is a ** “hot” stream ** and can have multiple subscribers.

Simple use:

val broadcasts=MutableSharedFlow<String>()

viewModelScope.launch{
      broadcasts.emit("Hello")
      broadcasts.emit("SharedFlow")
}


lifecycleScope.launch{
    broadcasts.collect { 
       print(it)
    }
}
Copy the code

StateFlow

StateFlow is a special variant of SharedFlow, which in turn is a special type of Kotlin data flow. StateFlow is the closest to LiveData because:

  • It’s always worth something.
  • Its value is unique.
  • It allows data streams to be shared (and therefore shared) by multiple observers.
  • It will always only reproduce the latest values to subscribers, regardless of the number of active observers.

StateFlow should be used when exposing the state of the UI to a view. This is a safe and efficient observer designed to accommodate UI state.

In short, it is similar to LiveData, but better!

StateFlow returns only if the value has been updated and is different. In simple terms, assume two values x and y, where x is the value originally emitted and y is the value to be emitted. If (x == y) does nothing, (x! =y), the new value is emitted only in this case. Simple use:

val stateFlow = MutableStateFlow(UIState.Loading)// Initial statestateFlow.value = UIState.Error launch { stateFlow.collect { ... }}Copy the code

See StateFlow and SharedFlow for more information

Back pressure

As long as there is responsive programming, there is always a backpressure problem, and let’s first look at what backpressure is: the problem caused by producers producing data faster than consumers can consume it. However, thanks to the Suspend function, transparent backpressure management can be implemented in Kotlin processes. When a stream collector becomes overwhelmed, it can simply suspend the emitter and resume it when it is ready to accept more elements. However, to ensure that data is not lost, we will also consider adding caching to alleviate the problem:

flow {
  List(100) {
    emit(it)
  }
}.buffer()
Copy the code

We can also specify a size for the buffer. However, if we simply add caching instead of fixing the root cause of the problem, we will always create a data backlog. (Like our cricket chat room message cache pool).

The root cause of the problem is the mismatch between production and consumption rates. In addition to directly optimizing consumer performance, we can also adopt some trade-offs.

The first is to conflate that the new data overwrites the old data. For example:

flow {
  List(100) {
    emit(it)
  }
}.conflate()
.collect { value ->
  println("Collecting $value")
  delay(100) 
  println("$value collected")}Copy the code

We quickly sent 100 elements, but only received two, although the result may not be the same each time:

Collecting 1
1 collected
Collecting 99
99 collected
Copy the code

The second is collectLatest. As the name implies, only the latest data is processed. This may seem like the same as Conflate, but it’s quite different: instead of overwriting old data with new data, each data is processed, except that if a later data arrives before the previous data has been processed, the logic for processing the previous data is cancelled. Again, the previous example, with a slight modification:

flow {
  List(100) {
    emit(it)
  }
}.collectLatest { value ->
  println("Collecting $value")
  delay(100)
  println("$value collected")}Copy the code

Results:

Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
Copy the code

In addition to collectLatest, there are mapLatest, flatMapLatest, and so on, all for this purpose.

The actual combat in the project

In recent years, Flow is a technology supported by Google, such as Room, DataStore, Paging3, etc., so what are you waiting for? Learn and use it.

Ordinary Suspend request modification

Multiple values can be sent, and the UI state is completely data-driven. For example, the Follow button can be modified to display results after Loading:

 @WorkerThread
 fun getObservableUserEvents(userId: String?).:Flow<Result<ObservableUserEvents>{
    return flow {
        emit(Result.Loading)
        if (userId == null) {
            emit(sessionRepository.getSessions())
        }
    }
}
Copy the code

Retry mechanism

I want to add a retry mechanism to a network request:

override suspend fun getTrendsList(a) = flow<Result<xxx>> {
   ...
   emit(Result.Success(result))
}.retry(2).catch { e ->
   emit(Result.Error(e))
}
Copy the code

LifecycleScope meets flow

The search has multiple tabs, all listening for the triggering of the search, but one TAB is expected to be triggered at a time. In the ViewPager, where the Fragment is onPause, you can still receive the LiveData callback, but use lifecycleScope and flow to solve this problem because launchWhenResumed is resumed:

lifecycleScope.launchWhenResumed{
    searchRequestFlow.collect{request->
        doSearch(request)
    }
}
Copy the code

After Lifecycle 2.4.0, a new API repeatOnLifecycle is provided that specifies the Lifecycle state, and instead of simply suspending when leaving the state, it takes the termination coroutine, and when the Lifecycle resumes:

lifecycleScope.launch { lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) { viewModel.navigationActions.collect { ...  }}}Copy the code

The official preference is to collect data flows using the repeatOnLifecycle API rather than collecting them inside the launchWhenX API. This is because later apis will suspend coroutines rather than cancel when Lifecycle is in the STOPPED state. Upstream data flows remain active in the background and may issue new items and consume resources.

“Data backflow “? There is no the

Previously, we would use LiveData to send an Event to interact with the UI or perform some logic, but sometimes the page reconstruction causes the LiveData to rebind, which would immediately receive a callback to trigger the logic. To solve this problem, Google’s example uses encapsulated events to determine whether an Event has been handled while Flow and Channel are not stable:

open class Event<out T>(private val content: T) {

    var hasBeenHandled = false
        private set // Allow external read but not write

    /** * Returns the content and prevents its use again. */
    fun getContentIfNotHandled(a): T? {
        return if (hasBeenHandled) {
            null
        } else {
            hasBeenHandled = true
            content
        }
    }

    /** * Returns the content, even if it's already been handled. */
    fun peekContent(a): T = content
}
Copy the code

However, this is a simple encapsulation. There can only be one observer. If you want to use it in complex scenarios, you need to design a Manager to manage multiple observers. This is not a problem with SharedFlow, after all, LiveData is not designed to do this, it is designed to bind to the UI.

Because SharedFlow is a heat flow, events are broadcast to an unknown number of subscribers. In the absence of a subscriber, any published event is immediately deleted. It is a design pattern for events that must be handled immediately or not at all. Example:

val scrollToEvent: SharedFlow<ScheduleScrollEvent> =
    loadSessionsResult.combineTransform(currentEventIndex) { result, currentEventIndex ->
       emit(ScheduleScrollEvent(currentEventIndex))
    }.shareIn(viewModelScope, WhileViewSubscribed, replay = 0) 
Copy the code

Sometimes you can also use a Channel for event sending, depending on the business scenario: a Channel will pass each event to a single subscriber. Once the Channel buffer is full, an attempt is made to pause event publishing without a subscriber, waiting for a subscriber to appear. Published events are never deleted by default. (However, you can also set no cache or only cache one.)

Example:

// SIDE EFFECTS: Navigation actions
private val _navigationActions = Channel<NavigationAction>(capacity = Channel.CONFLATED)
val navigationActions = _navigationActions.receiveAsFlow()
Copy the code

debounce

The search listens to the input box, and performs the search as you type. Here, do a debounce to avoid making too many SUG requests:

valquery=MutableStateFlow<String? > (null)

fun onTextChanged(text:String){
    query.value=text
}

launch{
    query.debounce(100).collect{text-> text? .let{ doSearch(text) } } }Copy the code

multiplexing

Request the cache and network at the same time, the network first to update the cache, and get the elimination coroutine, cache first to send the data to the UI and continue to execute until the network data returned.

listOf( async { dataSource.getCacheData() }, async { dataSource.getRemoteData() }) .map { deferred -> flow { emit(deferred.await()) } }.merge().onEach { result -> If (result.requestType == RequestType.NETWORK) {if (isActive) {_source.postValue(result)} if (result is) Result.Success) { result.data? .let {newData -> // Update cache datasource.flushCache (newData)} cancel()}} else {// Cache data if (result is result.success) {if (isActive) { _source.postValue(result) } } } }.onCompletion { isPreLoading.set(false) }.launchIn(this)Copy the code

Combining multiple streams

  • Zip

Take one at a time, and once one of the streams is complete, the resulting stream is complete and cancel is called on the remaining stream.

val flow = flowOf("4K Display"."2K display"."1080P display")
val flow2 = flowOf("Xiao Ming"."Chen"."Little red"."Little Eleven.")
flow.zip(flow2) { i, s -> i + "Yes."+s}.collect {println(it)} 4K display sent to Xiaoming 2K display sent to Chen 1080P display sent to XiaohongCopy the code
  • Combine

By combining the most recently emitted values for each stream, its value is generated using the transformation function:

val flow = flowOf("Tom"."Jack"."Lucifer")
val flow2 = flowOf("Xiao Ming"."Chen"."Little red"."Little Eleven.")
flow.combine(flow2) { i, s -> i + " 和 " + s + "Shake hands."}.collect {println(it)} Tom shook hands with Xiao-ming Jack shook hands with Xiao-ming Jack shook hands with Xiao-Ming Lucifer shook hands with Xiao-Chen Lucifer shook hands with Xiao-hong Lucifer shook hands with Xiao-Shi-langCopy the code

If we add a delay to the first flow emission:

val flow = flowOf("Tom"."Jack"."Lucifer").onEach { delay(10)}val flow2 = flowOf("Xiao Ming"."Chen"."Little red"."Little Eleven.")
flow.combine(flow2) { i, s -> i + " 和 " + s + "Shake hands."}.collect {println(it)} Tom shook hands with Jack shook hands with Lucifer shook hands with JackCopy the code

Due to the delay on the first stream, when the data is transmitted, the second stream has already been sent, so for the second stream, the latest value is “xiao 11 lang”. So it’s going to look like this.

The Flow operator is smaller than RxJava, but it is suitable for most scenarios. The rest of the operators are for you to study. For more operators, see Kotlin Flow

I wrote an Eventbus with SharedFlow

SharedFlow supports replay, supports multiple observers, adds coroutine features, marries Lifecycle and you have a FlowEventBus. !

[Kotlin] Just a few lines of code? ! Let’s write a FlowEventBus in SharedFlow

reference

Bennyhuo blog

Coroutines Flow best practice | application based on the Android developer summit

Kotlin Chinese language station -Flow

StateFlow and SharedFlow

Wow, you’ve seen this. Like it before you go. ~