preface

Substituting Android’s LiveData: StateFlow or SharedFlow? Substituting Android’s LiveData: StateFlow or SharedFlow? By Patrick Steiger

Kotlin Coroutines recently introduced two Flow types, SharedFlow and StateFlow, and the Android community is beginning to consider the possibility of using these new types as alternatives to LiveData. The two main reasons for this are:

  • 1.LiveData is tightly tied to the UI (there is no natural way to offload work to the worker thread)
  • 2.LiveData is closely bound to the Android platform.

We can conclude from these two facts that, as far as Clean Architecture is concerned, although LiveData works well on the presentation layer, it does not integrate well into the domain layer, which should be platform-independent (meaning a pure Kotlin/Java module); It is also not well suited for the data layer (repository implementations and data sources), since we should normally delegate data access to worker threads.

However, we cannot just replace LiveData with pure Flow. The main problems with using pure Flow as an alternative to LiveData are:

  • 1.Flow is stateless (and cannot be accessed via.value).
  • 2. Flows are declarative, and a Flow Builder only describes what a Flow is and only materializes at collection time. And new flows are effectively instantiated for each collector, which means expensive upstream database access is run redundantly and repeatedly for each collector.
  • 3.Flow itself is ignorant of the Android life cycle and does not automatically pause and resume the collector when the Android life cycle state changes.

These should not be seen as inherent flaws in Flow: they simply make it a poor substitute for LiveData, which might be useful in other situations.

For (3), we can already use LifecycleCoroutineScope extensions (such as launchWhenStarted) to start coroutines to collect our flows- these collectors will automatically pause and resume synchronically with the component’s Lifecycle.

Note: In this article, we use collection and observation as synonyms. Collection is the preferred term for Kotlin Flows (we collect a Flow) and observation is the preferred term for Android LiveData (we observe a LiveData).

But how do we implement (1) accessing the current state, (2) only once for N> = 1 collectors and disappearing for 0 collectors?

SharedFlow and StateFlow now offer solutions to both of these problems.

Let’s take an example. Our use case is getting the location nearby. Let’s assume that the Firebase real-time database is used with the GeoFire library, which allows you to query nearby locations.

Use LiveData end-to-end

Let’s first show the use of LiveData from the data source all the way to the view. The data source is responsible for connecting to the Firebase real-time database via GeoQuery. When we receive onGeoQueryReady or onGeoQueryError, the LiveData value will be updated with the sum of the positions entered, exited, or moved since the last onGeoQueryReady.

@Singleton
class NearbyUsersDataSource @Inject constructor() {
    // Ideally, those should be constructor-injected.
    val geoFire = GeoFire(FirebaseDatabase.getInstance().getReference("geofire"))
    val geoLocation = GeoLocation(0.0.0.0)
    val radius = 100.0
    
    val geoQuery = geoFire.queryAtLocation(geoLocation, radius)
    
    // Listener for receiving GeoLocations
    val listener: GeoQueryEventListener = object : GeoQueryEventListener {
        val map = mutableMapOf<Key, GeoLocation>()
        override fun onKeyEntered(key: String, location: GeoLocation) {
            map[key] = location
        }
        override fun onKeyExited(key: String) {
            map.remove(key)
        }
        override fun onKeyMoved(key: String, location: GeoLocation) {
            map[key] = location
        }
        override fun onGeoQueryReady(a) {
            _locations.value = State.Ready(map.toMap())
        }
        override fun onGeoQueryError(e: DatabaseError) {
            _locations.value = State.Error(map.toMap(), e.toException())
        }
    }

    // Listen for changes only while observed
    private val _locations = object : MutableLiveData<State>() {
        override fun onActive(a) {
            geoQuery.addGeoQueryEventListener(listener)
        }

        override fun onInactive(a) {
            geoQuery.removeGeoQueryEventListener(listener)
        }
    }

    // Expose read-only LiveData
    val locations: LiveData<State> by this::_locations
    
    sealed class State(open val value: Map<Key, GeoLocation>) {
        data class Ready(
            override val value: Map<Key, GeoLocation>
        ) : State(value)
        
        data class Error(
            override val value: Map<Key, GeoLocation>,
            val exception: Exception
        ) : State(value)
    }
}
Copy the code

Then, our Repository, ViewModel, and Activity should be simple:

@Singleton
class NearbyUsersRepository @Inject constructor(
    nearbyUsersDataSource: NearbyUsersDataSource
) {
    val locations get() = nearbyUsersDataSource.locations
}
Copy the code
class NearbyUsersViewModel @ViewModelInject constructor(
    nearbyUsersRepository: NearbyUsersRepository
) : ViewModel() {

    val locations get() = nearbyUsersRepository.locations
}
Copy the code
@AndroidEntryPoint
class NearbyUsersActivity : AppCompatActivity() {
    
    private val viewModel: NearbyUsersViewModel by viewModels()
    
    override fun onCreate(savedInstanceState: Bundle?). {
        viewModel.locations.observe(this) { state: State ->
            // Update views with the data.   }}}Copy the code

This might be a good idea before you decide to make the domain layer that contains the repository interface (platform-independent). Also, once you need to distribute work to worker threads on the data source, you will find that there is no simple, conventional way to LiveData.

Use Flow on data sources and repositories

Let’s convert the data source to use Flow. We have a Flow generator, callbackFlow, that converts the callback to Flow. Once the Flow is collected, it runs the code block passed to the Flow builder, adds a GeoQuery listener, and reaches awaitClose, where it pauses until the Flow is closed (that is, until no one collects it or is cancelled due to an exception). When closed, it removes listeners and the Flow is unimplemented.

@Singleton
class NearbyUsersDataSource @Inject constructor() {
    // Ideally, those should be constructor-injected.
    val geoFire = GeoFire(FirebaseDatabase.getInstance().getReference("geofire"))
    val geoLocation = GeoLocation(0.0.0.0)
    val radius = 100.0
    
    val geoQuery = geoFire.queryAtLocation(geoLocation, radius)
    
    private fun GeoQuery.asFlow(a) = callbackFlow {
        val listener: GeoQueryEventListener = object : GeoQueryEventListener {
            val map = mutableMapOf<Key, GeoLocation>()
            override fun onKeyEntered(key: String, location: GeoLocation) {
                map[key] = location
            }
            override fun onKeyExited(key: String) {
                map.remove(key)
            }
            override fun onKeyMoved(key: String, location: GeoLocation) {
                map[key] = location
            }
            override fun onGeoQueryReady(a) {
                emit(State.Ready(locations.toMap()))
            }
            override fun onGeoQueryError(e: DatabaseError) {
                emit(State.Error(map.toMap(), e.toException()))
            }
        }
        
        addGeoQueryEventListener(listener)
        
        awaitClose { removeGeoQueryEventListener(listener) }
    }

    val locations: Flow<State> = geoQuery.asFlow()
    
    sealed class State(open val value: Map<Key, GeoLocation>) {
        data class Ready(
            override val value: Map<Key, GeoLocation>
        ) : State(value)
        
        data class Error(
            override val value: Map<Key, GeoLocation>,
            val exception: Exception
        ) : State(value)
    }
}
Copy the code

Our Repository and ViewModel do not make any changes, but our Activity now receives flows instead of LiveData, so it needs to adjust: we will collect flows instead of observing LiveData.

@AndroidEntryPoint
class NearbyUsersActivity : AppCompatActivity() {
    
    private val viewModel: NearbyUsersViewModel by viewModels()
    
    override fun onCreate(savedInstanceState: Bundle?). {
        lifecycleScope.launchWhenStarted {
            viewModel.locations.collect {
                // Update views with the data.   }}}}Copy the code

We use launchWhenStarted {} to collect the Flow, so the coroutine starts automatically only when the Activity reaches the onStart () life cycle state and pauses automatically when it reaches the onStop () life cycle state. This is similar to the life cycle automation that LiveData gives us.

Note: You can choose to continue using LiveData in the presentation layer. In this case, you can easily convert a Flow from Flow to LiveData in the ViewModel using the flow.asliveData extension function. This decision will have consequences that we will continue to discuss and will prove that end-to-end use of SharedFlow and StateFlow is more generic and may be a better fit for your architecture.

What’s the problem with using Flow in the View layer?

The first problem with this approach is life cycle handling, which LiveData automatically handles for us. In the example above, we achieved similar behavior by using launchWhenStarted {}.

But there is another problem: Because flows are declarative and run only at collection time (instantiation), if we have multiple collectors, we will run a new Flow for each collector, completely independent of each other. Depending on what is being done (for example, a database or network operation), this can be very ineffective. If we expect the operation to be performed only once to ensure correctness, it may even result in an incorrect state. In our real-world example, we would add a new GeoQuery listener for each collector – probably not a critical issue, but certainly a waste of memory and CPU cycles.

Note: If the repository Flow is converted to LiveData by using flow.asliveData () in the ViewModel, the LiveData will be the only collector of flows, and there will be only one Flow set regardless of how many observers there are in the presentation layer. However, for this architecture to work, you need to ensure that all your other components have access to LiveData from the ViewModel and never directly to Flow from the repository. This can be a challenge, depending on the degree of separation of the application: all components that require a repository will now rely on Activity instances to get ViewModel instances, and the scope of those components needs to be limited accordingly.

No matter how many collectors we have in the View layer, we only need one GeoQuery listener. We can do this by sharing the flow between all collectors.

SharedFlow

SharedFlow is a flow that allows itself to be shared between multiple collectors, so that only one flow is effectively running (implemented) for all concurrent collectors. If a SharedFlow to access the database is defined and collected by multiple collectors, the database access will run only once, and the resulting data will be shared with all collectors.

StateFlow can also be used to implement the same behavior: it is a dedicated SharedFlow with a. Value (its current state) and a specific SharedFlow configuration (constraint). We will discuss these limitations later. We have an operator that converts any Flow to a SharedFlow

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope, 
    started: SharingStarted, 
    replay: Int = 0
): SharedFlow<T> (source)
Copy the code

Let’s apply this to our data source.

The scope is where all the calculations to implement the process are done. Since our data source is @Singleton, we can use the LifecycleScope of the application process, which is a LifecycleCoroutineScope that is created when the process is created and destroyed only when the process is destroyed.

For started parameters, we can use SharingStarted. WhileSubscribed (), which makes the Flow only when the subscriber number from 0 to 1 to start sharing (implementation), and stop when the subscriber number from 1 to 0 to be Shared. This is similar to the LiveData behavior we implemented earlier by adding a GeoQuery listener in the onActive () callback and removing the listener in the onInactive () callback. We can also configure it to start immediately (implemented immediately, and never unimplemented again) or lazy start (implemented at first collection, and never unimplemented), but we want it to stop collecting upstream databases when the downstream is no longer collected.

A note about terms: Just as we used the term “observer” for LiveData and “collector” for Flow, we used the term “subscription” for SharedFlow.

For the replay parameter, we can set it to 1, and the new subscriber will get the last issued value immediately upon subscription.

@Singleton
class NearbyUsersDataSource @Inject constructor() {
    // Ideally, those should be constructor-injected.
    val geoFire = GeoFire(FirebaseDatabase.getInstance().getReference("geofire"))
    val geoLocation = GeoLocation(0.0.0.0)
    val radius = 100.0
    
    val geoQuery = geoFire.queryAtLocation(geoLocation, radius)
    
    private fun GeoQuery.asFlow(a) = callbackFlow {
        val listener: GeoQueryEventListener = object : GeoQueryEventListener {
            val map = mutableMapOf<Key, GeoLocation>()
            override fun onKeyEntered(key: String, location: GeoLocation) {
                map[key] = location
            }
            override fun onKeyExited(key: String) {
                map.remove(key)
            }
            override fun onKeyMoved(key: String, location: GeoLocation) {
                map[key] = location
            }
            override fun onGeoQueryReady(a) {
                emit(State.Ready(map.toMap())
            }
            override fun onGeoQueryError(e: DatabaseError) {
                emit(State.Error(map.toMap(), e.toException())
            }
        }
        
        addGeoQueryEventListener(listener)
        
        awaitClose { removeGeoQueryEventListener(listener) }
    }.shareIn(
         ProcessLifecycleOwner.get().lifecycleScope,
         SharingStarted.WhileSubscribed(),
         1
    )

    val locations: Flow<State> = geoQuery.asFlow()
                     
    sealed class State(open val value: Map<Key, GeoLocation>) {
        data class Ready(
            override val value: Map<Key, GeoLocation>
        ) : State(value)
        
        data class Error(
            override val value: Map<Key, GeoLocation>,
            val exception: Exception
        ) : State(value)
    }
}
Copy the code

It may be helpful to think of SharedFlow itself as a Flow collector, which turns a cold Flow upstream into a hot Flow and shares the collected values among many collectors downstream. There is a person in the middle between the cold upstream stream and the multiple downstream collectors.

Now, we might think that the Activity doesn’t need to be tweaked. Error! There is a catch: when a stream is collected in a coroutine started with launchWhenStarted {}, the coroutine will pause on onStop () and resume on onStart (), but will still subscribe to the stream. For MutableSharedFlow, this means that mutablesharedflow. subscriptionCount does not change for suspended coroutines. To take advantage of SharingStarted. WhileSubscribed () function, we actually need in onStop () on unsubscribe, then in the onStart () to subscribe again. This means uncollecting the coroutine and recreating it.

Let’s create a class for this purpose:

@PublishedApi
internal class ObserverImpl<T> (
    lifecycleOwner: LifecycleOwner,
    private val flow: Flow<T>,
    private val collector: suspend (T) -> Unit
) : DefaultLifecycleObserver {

    private var job: Job? = null

    override fun onStart(owner: LifecycleOwner) {
        job = owner.lifecycleScope.launch {
            flow.collect {
                collector(it)
            }
        }
    }

    override fun onStop(owner: LifecycleOwner){ job? .cancel() job =null
    }

    init {
        lifecycleOwner.lifecycle.addObserver(this)}}inline fun <reified T> Flow<T>.observe(
    lifecycleOwner: LifecycleOwner.noinline collector: suspend (T) - >Unit
) {
    ObserverImpl(lifecycleOwner, this, collector)
}

inline fun <reified T> Flow<T>.observeIn(
    lifecycleOwner: LifecycleOwner
) {
    ObserverImpl(lifecycleOwner, this}, {})Copy the code

Now we can adjust the Activity to use the. ObserveIn (LifecycleOwner) extension we just created:

@AndroidEntryPoint
class NearbyUsersActivity : AppCompatActivity() {
    
    private val viewModel: NearbyUsersViewModel by viewModels()
    
    override fun onCreate(savedInstanceState: Bundle?). {
        viewModel
            .locations
            .onEach { /* new locations received */ }
            .observeIn(this)}}Copy the code

The collector coroutine CREATED with observeIn (LifecycleOwner) is destroyed when the LifecycleOwner life cycle reaches the CREATED state (just before onStop () is called), And once the STARTED state is reached (after the onStart () call) it will be recreated.

Note: Why is the CREATED state? Shouldn’t it be STOPED? It sounds counterintuitive at first, but it makes sense. Lifecycle.State has only the following states: CREATED, DESTROYED, INITIALIZED, RESUMED, STARTED. There are no STOPPED and PAUSED states. When the life cycle reaches onPause (), it does not return to the new state, but to the STARTED state. When onStop () is reached, it returns to the CREATED state.



Now we have a data source that implements once but shares its data with all subscribers.

Once there are no subscribers, its upstream collection stops and restarts when the first subscriber reappears.

It is not dependent on the Android platform and is not tied to the main thread (flow conversion can be performed in other threads only by applying the. FlowOn () operator:

FlowOn (dispatchers.io) or. FlowOn (dispatchers.default)).

But what if I end up needing access to the current state of the Flow without collecting it?

If we really need to access the state of a Flow using.value as we do with LiveData, we can use StateFlow, which is a dedicated restricted SharedFlow.

ShareFlow uses shareIn to implement StateFlow using stateIn implementation

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope, 
    started: SharingStarted, 
    initialValue: T
): StateFlow<T> (source)
Copy the code

As you can see from the method parameters, there are two basic differences between sharedIn () and stateIn () :

1. StateIn () does not support replay customization. StateFlow is SharedFlow with fixed replay=1. This means that new subscribers will get their current status immediately after subscribing. StateIn () requires an initial value. This means that if you don’t have an initial value at the time, you need to make the StateFlow type T nullable or use a sealed class to represent an empty initial value.

Which to choose, StateFlow or SharedFlow?

An easy way to answer this question is to try to answer some other questions:

“Do I really need to use myflow. value to access the current state of Flow at any given time?” If the answer to this question is “no,” then SharedFlow can be considered.

“Do I need support to emit and collect duplicate values?” If the answer to this question is “yes”, SharedFlow is required.

“For new subscribers, do I need to replay content that exceeds the latest value?” If the answer to this question is “yes”, SharedFlow is required.

As we’ve seen, StateFlow is not automatically the right answer for everything.

1. It ignores (merges) duplicate values and is not configurable. Sometimes you don’t need to ignore duplicate values, for example: a connection attempt stores the result of the attempt in a stream and needs to be retried after each failure.

2. In addition, it requires an initial value. Since SharedFlow does not have a. Value, there is no need to instantiate with an initial value – the collector will pause until the first value appears, and no one will attempt to access the. Value until any value arrives. If you do not have an initial value for StateFlow, you must set the StateFlow type to nullable T? And use NULL as the initial value (or use a sealed class to represent an empty initial value).

3. In addition, you may need to adjust the replay value. SharedFlow can replay the last n values for new subscribers. StateFlow has a fixed replay value of 1- it only shares the current state value

Both support the SharingStarted (immediately, lazy loading or WhileSubscribed ()) configuration. I usually use SharingStarted. WhileSubscribed () and the Activity onStart ()/onStop (), destroy all/recreate the collector, therefore, when a user is not actively using the application, the data source upstream collecting will stop

The constraints imposed by StateFlow on SharedFlow may not be best for you, and you may need to adjust your behavior and choose to use SharedFlow. Personally, I rarely need to access myFlow.value and enjoy the flexibility of SharedFlow, so I usually choose SharedFlow.

An example using SharedFlow

Consider the following packaging around the Google Billing Client library. We have a MutableSharedFlow billingClientStatus that stores the current connection status to the billing service. We set the initial value to SERVICE_DISCONNECTED. We collect billingClientStatus, and when it is abnormal, we try to connect startConnection () to the billing service. If the connection attempt fails, we issue SERVICE_DISCONNECTED.

In this example, if billingClientStatus is MutableStateFlow instead of MutableSharedFlow, when its value is already SERVICE_DISCONNECTED and we try to set it to the same (connection retry failed), It will ignore updates, so it will not attempt to reconnect again.

@Singleton
class Biller @Inject constructor(
    @ApplicationContext private val context: Context,
) : PurchasesUpdatedListener, BillingClientStateListener {
    
    private var billingClient: BillingClient =
        BillingClient.newBuilder(context)
            .setListener(this)
            .enablePendingPurchases()
            .build()
        
    private val billingClientStatus = MutableSharedFlow<Int>(
        replay = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    
    override fun onBillingSetupFinished(result: BillingResult) {
        billingClientStatus.tryEmit(result.responseCode)
    }

    override fun onBillingServiceDisconnected(a) {
        billingClientStatus.tryEmit(BillingClient.BillingResponseCode.SERVICE_DISCONNECTED)
    }
    
    // ...
    
    // Suspend until billingClientStatus == BillingClient.BillingResponseCode.OK
    private suspend fun requireBillingClientSetup(a): Boolean =
        withTimeoutOrNull(TIMEOUT_MILLIS) {
            billingClientStatus.first { it == BillingClient.BillingResponseCode.OK }
            true
        } ?: false
   
    init {
        billingClientStatus.tryEmit(BillingClient.BillingResponseCode.SERVICE_DISCONNECTED)
        billingClientStatus.observe(ProcessLifecycleOwner.get()) {
            when (it) {
                BillingClient.BillingResponseCode.OK -> with (billingClient) {
                    updateSkuPrices()
                    handlePurchases()
                }
                else -> {
                    delay(RETRY_MILLIS)
                    billingClient.startConnection(this@Biller)}}}}private companion object {
        private const val TIMEOUT_MILLIS = 2000L
        private const val RETRY_MILLIS = 3000L}}Copy the code

In this case, we need to use SharedFlow, which supports issuing successive repeated values.

conclusion

This article is mainly about the introduction of SharedFlow and StateFlow and some attempts to use them to replace LiveData. This is the first article translated below. There are still many shortcomings, please combine the original text to watch, if there is any total amount, welcome to put forward corrections.