The previous articles have introduced some of the basic uses of Flow and the Channel behind it.

This is the fourth article in Kotlin’s coroutine series.

This article will continue to try to explore more advanced features in Flow from an RxJava user’s perspective to meet more usage scenarios.

Kotlin coroutine series related articles navigation

Scratch the Kotlin coroutine

Kotlin Flow guide (1) Basic applications

Kotlin Flow guide (2) ChannelFlow

SharedFlow and StateFlow

Kotlin version: 1.5.31

Coroutine version: 1.5.2

The following text.


SharedFlow

Those familiar with RxJava should be aware of the Subject, unlike data flows such as Observale, which require a call to a SUBSCRIBE subscription before sending data.

Subject is heat flow, and has the function of sending and receiving data at the same time. It is both producer and consumer.

In Flow, there is also a corresponding function – SharedFlow

Compared to regular Flow, SharedFlow refers to a shareable Flow of data.

  • Multiple subscribers can be allowed to share the same data stream.
  • Don’t callcollectCollect data and start sending data.
  • Allows caching of historical data
  • Sending data functions are thread-safe.

SharedFlow itself is simply defined as a collection of multiple historical data caches that only allow subscription data.

Just like List and MutableList in Kotlin, one contains only readable names and the other contains only readable and writable names.

ShardFlow also has a variable-type version, MutableShardFlow, which defines the ability to send data.

create

Let’s start by looking at how MutableSharedFlow is created.

  • replay

    Represents the size of the history element cache.

    • The ability to cache the latest data into the collection, and remove the earliest elements when the history cache is full.
    • When a new consumer subscribes to the data flow, the historical cache elements are sent to the new consumer in turn before the new elements are sent.
  • extraBufferCapacity

    Represents the capacity of the additional cache area in addition to the history cache area, used to expand the internal overall cache capacity.

  • onBufferOverflow

    The cache backpressure strategy, which defaults to the familiar bufferoverflow. SUSPEND, suspends the emit function and suspends sending data when the extra buffer is full.

    Other back pressure strategies are supported only if neither replay nor extraBufferCapacity is 0.

Simple to use

Different from Flow and ChannelFlow, FlowCollector or ProducerScope is used to send data. MutableSharedFlow itself has the function of sending data, which makes its usage closer to the daily use of MutableList.

Fun test() = runBlocking{val sharedFlow = MutableSharedFlow<String>() Val produce = launch(dispatchers.io){for (I in 0.. 50){ sharedFlow.emit("data$i") delay(50) } } }Copy the code

Although there is no consumer subscription at this point, the data is still sent, but there is no historical cache and all data is “thrown away”.

Usually we do not want to be able to send data from the consumer subscriber and only allow external data flow subscriptions. In this case, we need to call the asSharedFlow function to convert the mutable MutableSharedFlow to read-only SharedFlow.

fun test() = runBlocking{ ... ReadOnlySharedFlow = sharedFlow.assharedFlow () val scope = CoroutineScope(container job ()) delay(100) val Map {delay(100) "$it receive 1"}. Collect {println("collect1 ") Result: $it")}} delay(1000) job1.cancel()} collect1 result: data3 receive 1 collect1 result: data4 receive 1 collect1 result : data5 receive 1 collect1 result : data6 receive 1 collect1 result : data7 receive 1 collect1 result : data8 receive 1 collect1 result : data9 receive 1 collect1 result : data10 receive 1 collect1 result : data11 receive 1Copy the code

We simulate an external consumer, where a delay of 100ms to re-subscribe data will only receive all subsequent data sent since the subscription.

As a subclass of Flow, SharedFlow can also use the intermediate operators of Flow.

This is equivalent to PublishSubject in RxJava.

If the replay property is set to 2 when SharedFlow is created, the latest two values will be cached and the result will be:

 val sharedFlow = MutableSharedFlow<String>(replay = 2)
 ​
 collect1 result : data1 receive 1
 collect1 result : data2 receive 1
 collect1 result : data3 receive 1
 collect1 result : data4 receive 1
 collect1 result : data5 receive 1
 collect1 result : data6 receive 1
 collect1 result : data7 receive 1
 collect1 result : data8 receive 1
Copy the code

Which should have been sent before the subscription of the two values are collected by consumers, equivalent RxJava ReplayRelay. CreateWithSize < String > (2).

Collect is a hang function that waits for upstream data regardless of whether the upstream sends it or not.

Therefore, for SharedFlow, it is important to note that the subsequent tasks will not be performed in the coroutine where the consumer resides.

fun test() = runBlocking{ ... Delay (200) val job2 = scope.launch {readonLysharedflow. map {"$it receive 2"}. Collect {println("collect2 ") result : $it")} } delay(1000) job1.cancel() job2.cancel() }Copy the code

If a new consumer is added, it continues to receive newly sent data upstream until the consumer’s coroutine is shut down.

 collect1 result : data3 receive 1
 collect2 result : data5 receive 2
 collect1 result : data4 receive 1
 collect2 result : data6 receive 2
 ...
 collect2 result : data13 receive 2
 collect1 result : data12 receive 1
 collect2 result : data14 receive 2
 collect1 result : data13 receive 1
Copy the code

But if a consumer of JoB2 actively throws an exception:

 readOnlySharedFlow.map {
     if (it == "data6") throw Exception("test Exception")
     "$it receive 2"
 }
 ​
 collect1 result : data3 receive 1
 collect2 result : data5 receive 2
 collect1 result : data4 receive 1
 ​
 test Exception
 java.lang.Exception: test Exception
 ...
Copy the code

When an uncaught exception occurs in a consumer, there are two scenarios based on the Job type of the coroutine on which the consumer is running

  • If the context of the coroutine scope isJobThrow an exception that cannot be caught if used like in a test programrunBlockingAn exception will be thrown and the program will crash.
  • If the context of the coroutine scope isSupervisorJob, will only affect the coroutine of the consumer, other consumers receive data is not affected.

The exception thrown all consumers would not affect the upstream Shared data flow, and of course all SharedFlow subscribers had better use catch operators catch exceptions to live, also can be set up within the coroutines CoroutineExceptionHandler to capture.

Cold flow heat flow

In RxJava, the ability to convert a Subject’s hot flow to a cold flow of Flowable type through the toFlowable function does not appear to be provided.

ShardFlow provides the function of cold flow heat flow — shareIn.

Here started indicates the start and stop policies for the newly created shared data flow.

  • Eagerly

    Start sending data sources immediately. And the consumer will always collect data, only receiving the history cache and subsequent new data until the current coroutine cancels.

  • Lazily

    Wait for the first consumer subscription before sending the data source. The first consumer is guaranteed to receive all data, but subsequent consumers can only receive historical cache and subsequent data.

    The consumer waits forever for the data to be collected until the current coroutine is cancelled

  • WhileSubscribed

    This is an advanced version of Lazily’s strategy, which again waits for the first consumer to subscribe before sending the data source.

    However, it can be configured to stop the upstream of the shared data stream after the last subscriber closes (immediately by default), and to clear the historical data cache (forever by default).

    Public fun WhileSubscribed(stopTimeoutMillis: Long = 0, // Delay end of data stream, MS replayationMillis: MAX_VALUE,ms): SharingStartedCopy the code

By using shareIn and stateIn, the Flow data Flow that consumes one resource to obtain data from the data source can be converted into SharedFlow or StateFlow to achieve one-to-many event distribution and reduce the consumption of resources in multiple calls.

Note that each time shareIn is used, a new SharedFlow instance is created and remains in memory until it is garbage collected.

It is best to reduce the number of times the conversion flow is executed and not to call such functions every time within a function.

For more information about SharingStarted usage scenarios, see shareIn and stateIn Usage Notes

SharedFlow implementation analysis

ChannelFlow implements thread-safe multi-coroutine communication through channels. What about SharedFlow’s internal implementation?

SharedFlow is created by the MutableSharedFlow factory function, which internally creates the SharedFlowImpl object and internally caches the data directly using an array.

To send data

The emit and tryEmit functions of MutableSharedFlow are used to send data.

Locked by synchronized inside the tryEmit, sending data is thread-safe.

History cache

What is the so-called historical data cache when replay is not zero? Continue deep into tryEmitLocked.

As you can see, the new element is actually added to the cache array within the enqueueLocked function.

Each time a new element is added to the array, the cache array capacity is checked. By default, the array capacity is created first. When the array capacity is full, the cache array capacity is expanded by twice the current capacity.

The historical cache data is a List of the latest number of replays retrieved from the cache array buffer each call.

Of course, caching an array buffer does not cache data indefinitely.

In tryEmitLocked, if the number of cached data bufferSize exceeds the total cache capacity bufferCapacity that is allowed to remain, dropOldestLocked is called to discard the earliest cached data.

If historical cache is not set by default, bufferCapacity is int.max_value.

DropOldestLocked’s internal discarding of the data simply nullified the elements of the array index. The total size of the cached array remains the same. Adding more elements still requires an array expansion.

To collect data

Data subscription in Flow is naturally the realization of collect function.

The internal logic is clear, by assigning a Slot to bind, in a loop trying to value from the cache array and sending it downstream via the emit function.

Collect of SharedFlow is an infinite loop, which will always try to value from the cache, so COLLECT will remain suspended until the coroutine is closed.

This Slot is a collector state tool that binds to the data stream consumer and records the index of the cache array of currently distributed data.

In SharedFlow’s parent class, AbstractSharedFlow, these collector state logging tools are cached using arrays and the allocateSlot function is implemented to allocate bindings to the collector.

When the collector is rebound, the downstream of the record in Solt is reset to the pre-replay element index of the latest value index to ensure that the downstream receives data from the history first.

Continue back to the tryTakeValue that fetched the element from the cache array, which is still synchronized, and add the element to the cache array index +1.

If there are no elements to send at this point, the suspend function awaitValue is called and the state is suspended, waiting for data to be added to the cache array.

summary

The advent of SharedFlow makes it possible to stream one-to-many data and enjoy the convenience of the Flow operator.

However, SharedFlow is not based on channels internally, but on arrays +synchronized.

Compared to internal channel-based ChannelFlow:

  • Both are thread-safe, with Channel using ReentrantLock+CAS and SharedFlow using synchronized.

  • Inside a Channel, there is a lock-free bidirectional list structure, where each node is a CAS reference type, while SharedFlow is an array structure, which allows for a collection of historical caches taken from an array.

  • SharedFlow is a hot flow that allows multiple consumers to subscribe to the same data flow; ChannelFlow is a cold stream and only allows a single consumer to subscribe.

  • Because SharedFlow runs sending data before subscription, some data may be missing. ChannelFlow, on the other hand, starts sending data only after the downstream subscribing, and receives all data by default.

  • The emphasis of the internal infinite loop is different.

    • ChannelFlowfromThe producer Angle keeps going round and roundWaiting for theChannelAfter sending the data, providedcloseFunction active closingChannelChannel.
    • SharedFlowIs in theThe consumer perspective keeps going round and round, waiting for the data sent upstream, and leaving the shutdown operation completely under the management of the local coroutine.

While the concept of an EventBus is quite obnoxious due to overuse, SharedFlow works just as well for such scenarios as RxBus and EventBus did before, and is thread-safe.

Strict specifications that limit the scope of their use and limit the specific event distribution mechanism for a specific scenario, such as a scenario where data is fetched once from location data and distributed to all subscribers, are a good choice.

StateFlow

In the ERA of RxJava, we also had BehaviorSubject options, or later, a LiveData with a more singular responsibility, when you need multiple consumers to subscribe to only the latest value and receive all subsequent values, while also getting the latest value without subscribating.

With Flow, a more convenient class, StateFlow, is available to handle this scenario.

StateFlow is actually a subclass of SharedFlow. It also has two types: read-only and readable and writable, StateFlow and MutableStateFlow.

In terms of interface definitions, StateFlow and LiveData are very similar.

Similarities:

  • Both allow multiple consumers
  • Both have read-only and mutable types
  • Only one state value is always saved
  • Also supportsDataBinding(StateFlowRequires a new version to support)

Unlike LiveData:

  • Enforce initial defaults
  • Supports CAS mode assignment
  • Anti – shake filtering is supported by default
  • Null security check of value
  • FlowRich asynchronous data flow operations
  • The default is noLifecycleSupport,flowthecollectIs a suspend function that waits for the data stream to deliver data
  • Thread safety.LiveDatathepostValueAlthough it can also be used asynchronously, it will cause data loss.

LiveData support for Lifecycle aside, StateFlow is basically in a full crush situation.

use

As a subclass of SharedFlow, StateFlow is essentially the same in usage as its parent.

The same factory function of the same name is used to create StateFlow, but compared to SharedFlow, StateFlow must set the default initial value.

And MutableStateFlow can’t configure a buffer, or there is always only one buffer, only the latest value is cached.

At the same time, we need to shield the external sending of contaminated data and only provide read-only StateFlow to the external. In this case, we need asStateFlow.

fun test() = runBlocking{ val stateFlow = MutableStateFlow(1) val readOnlyStateFlow = stateFlow.asStateFlow() / / simulation external subscription data immediately val job0 launch of = {readOnlyStateFlow. Collect {println (" collect0: $it")}} delay(50) launch {for (I in 1.. 3){println("wait emit $I ") stateflow. emit(I) delay(50)}} In the new page to subscribe to delay (200) val job1 is launch of = {readOnlyStateFlow. Collect {println (" collect1: $it") } } val job2 = launch { readOnlyStateFlow.collect{ println("collect2 : $it") } } println("get value : ${readOnlyStateFlow.value}") delay(200) job0.cancel() job1.cancel() job2.cancel() } collect0 : 1 wait emit 1 wait emit 2 collect0 : 2 wait emit 3 collect0 : 3 get value : 3 collect1 : 3 collect2 : 3Copy the code

As you can see, when you subscribe without sending data, the default values are received first.

The new data is filtered out because the first value is the same as the original value.

Subsequent new subscribers will receive only the latest values.

If the StateFlow subscriber is in a coroutine, it is better to use a separate coroutine. Collect will always hang and subsequent operations within the coroutine will not be executed

Cold flow transforms into hot flow

StateFlow also has an operator function called stateIn that converts cold Flow to heat Flow.

The only difference with the shareIn function is that the default value must be set. The shared data stream converted by stateIn only caches the latest value.

StateFlow implementation analysis

StateFlow does not have an internal SharedFlow cache array. Instead, it uses atomic references to state values of type, always keeping only the latest value.

Since only one value is saved, the value object can be evaluated and assigned.

To send data

All tryEmit and emit data operations call the setValue operation and finally call the updateState function for CAS state assignment.

The function looks quite long, and it cleverly updates the serial number according to a sequence and locks with synchronized. With infinite loop, it only allows normal update process when the serial number is even, and finally updates the serial number as odd.

If the update sequence number is odd, it indicates that the update process has been performed and subsequent processes are directly skipped.

Collector state

StateFlowImpl’s parent class is also AbstractSharedFlow. Unlike SharedFlow, the consumer state tool here is another implementation of AbstractSharedFlowSlot, StateFlowSlot.

This Slot also has an atomicCAS reference type inside, which allows for four states

  • Null – Indicates that it is free and can be allocated to the consumer collector
  • NONE – Indicates that it has been assigned to the consumer receiver
  • PENDING – Indicates that a new value has been updated upstream to be sent to the collector
  • CancellableContinuationImpl – said the collector has hung waiting for upstream data

In the process of StateFlow updating state values, all solTs that have been allocated are iterated over, and makePending is called to try to update all Solt state CAS that have been allocated to PENDING, making the consumer ready to receive the data.

To collect data

Flow The consumer uses collect to collect data.

Consumer subscription functions, like SharedFlow, loop around waiting for upstream data.

On the first subscription only, StateFlow sends the latest data immediately. Then each time before sending data will be repeated filtering, and air security checks.

The same awaitPending function is then called, creating a new coroutine and entering the pending state until the coroutine is restored by upstream resending the data.

Association life cycle

Back in the RxJava era, if you called the SUBSCRIBE data stream directly in a view, receiving data updates while the view life cycle was in background state might not be as expected.

So after LiveData emerged, data subscriptions in the view layer were handed over to LiveData, while RxJava was gradually relegated to the data layer for data logic processing.

The current Flow calls Collect in the view to subscribe to the data Flow, which naturally has the same problem as RxJava.

Therefore, we need Flow to not subscribe to the data stream or pass data to the consumer while the view life cycle is in the background.

LifecycleCoroutineScope

In the early days Lifecycle library provided the extension property coroutineScope.

The View-specific LifecycleCoroutineScope coroutine scope, which cancels the coroutine scope when the view is destroyed, has several launchWith functions.

 public val Lifecycle.coroutineScope: LifecycleCoroutineScope
     
 public abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
     ...
     public fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
         lifecycle.whenCreated(block)
     }
     
     public fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
         lifecycle.whenStarted(block)
     }
     
     public fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
         lifecycle.whenResumed(block)
     }
     ...
 }
Copy the code

The launchWhenStarted function, for example, limits the execution of the coroutine where the consumer of the Flow data stream is in a State less than Lifecycle.state. STARTED. (Photo from Internet)

That is, the internal logic of the Collect function in the coroutine where the consumer side is located will only resume execution after the view is in the onStart life cycle, while the life cycle in the background will be temporarily suspended.

However, for Flow data flows, even if the processing logic on the consumer side is suspended, the data source on the production side is still executing. In particular, some scenarios where the data source is running all the time may cause unnecessary resource consumption, and the use of these functions is not officially recommended at present.

repeatOnLifecycle

With the update of the Lifecycle- Run-time KTX library to version 2.4.0, Lifecycle provides a new extension function repeatOnLifecycle.

Looking at the code implementation, the internal logic is simple: switch CoroutineContext to the UI main thread, start the coroutine and subscribe to the data stream when it enters the allowed lifecycle state. When the specified life cycle state is exceeded, the coroutine is closed and the subscription is unsubscribed

Comparison with other methods :(image from official)

For StateFlow, which shares data flows, each subscription will only fetch the latest value, which is closer to the logic of LiveData usage.

With the release of repeatOnLifecycle, plans were made to remove launchWith functions.

The design reasons for repeatOnLifecycle can be found in the story behind designing the repeatOnLifecycle API.

Lifecycle library also provides an intermediate operator for Flow flowWithLifecycle, which internally calls repeatOnLifecycle using callbackFlow.

CallbackFlow is actually implemented internally based on Channel and acts as a buffer for upstream data flows.

  • For a singleFlowLife cycle control of data flow,flowWithLifecycleOperators are good for boilerplate code.
  • If you need to control more than one at a timeFlowData flow lifecycle, again recommendedrepeatOnLifecycleAvoid duplicate creationChannel.
  • Cold flowFlowIt is not recommended to use directlyflowWithLifecycleTo avoid creating new data sources multiple times.

Flow data flows can then be easily bound to view life cycles in activities or fragments

override fun onCreate(savedInstanceState: Bundle?) {super. OnCreate (savedInstanceState) / / simulation from the viewModel open status updates the viewModel. ReadOnlyStateFlow / / in onStart open subscription data flow upstream, OnPause unsubscribe. FlowWithLifecycle (lifecycle, lifecycle, the State STARTED). OnEach {/ / do something}. LaunchIn (lifecycleScope) // Run in the main thread coroutine scope, automatically unscoped on view destruction}Copy the code

conclusion

This completes the shared data Flow, the final piece of the Kotlin Flow puzzle.

  • SharedFlowAs the existence that allows retention of the historical cache and only new data to be received, forOne-to-many event distribution“Is a good choice.
  • StateFlowIs with the originalLiveDataThe positioning of the overlap, always only hold the latest data, more suitable for processingStatus updates.

With repeatOnLifecycle limiting view lifecycle subscriptions, StateFlow can completely replace LiveData, updating the view’s state display while supporting sticky data.

When encapsulating LiveData to handle single execution events without stickiness, SharedFlow can take on this responsibility by setting replay to 0.

Of course, if you don’t need Flow data Flow operations and thread-safety requirements, a single-responsibility class like LiveData that handles view state updates is still a good choice. Simplicity also means it’s less error-prone and easier to maintain.

After all, StateFlow depends on the Kotlin coroutine, so it is convenient for LiveData to subscribe to state directly using the Observer.

In general, with the advent of Kotlin Flow, there are great new options, from logical processing of data sources to state and event subscriptions at the view layer.

Only by being familiar with and understanding the operation mechanism behind it can we better use it flexibly in the appropriate scene.

The resources

StateFlow and SharedFlow

Collect Android UI data streams in a more secure way

Do not follow suit party, LiveData, StateFlow, SharedFlow usage scenario comparison

Migrate from LiveData to Kotlin data streams

Usage instructions for the Flow operators shareIn and stateIn