In this tutorial, you will learn about reactive flows from Kotlin and build an application using two types of flows, SharedFlow and StateFlow.

Event flow has become standard on Android. RxJava has been the standard for reactive flows for many years. Now, Kotlin offers his own implementation of reactive Flow, called Flow. Like RxJava, Kotlin Flow can create and react to data flows. Also like RxJava, event flows can come from cold or hot publishers. The difference between the two is simple: cold flow emits events only if there are subscribers, while hot flow can emit new events even if no subscribers subscribe to it. In this tutorial, you will learn about the heat Flow implementations of Flow, called SharedFlow and StateFlow. More specifically, you will learn the following.

  • What is SharedFlow?
  • What StateFlow is and how it relates to SharedFlow.
  • These heat flows are compared with RxJava, Channels and LiveData.
  • How do you use them on Android?

“You might ask yourself. Why use Kotlin’s SharedFlow and StateFlow instead of RxJava?” While RxJava gets the job done, some people like to describe it as “killing ants with a bazooka.” In other words, although the framework is effective, it is easy to be confused by all its capabilities. Doing so leads to overly complex solutions and unintelligible code. Kotlin flows provide a more direct and concrete implementation of reactive flows.

Getting Started

You’ll be working on an app called CryptoStonks5000. The application has two interfaces. The first screen shows the user some cryptocurrencies, and the second screen shows the price movement of a cryptocurrency over the past 24 hours.

To understand StateFlow and SharedFlow, you need to:

  • Use SharedFlow to implement an event flow to handle events shared between multiple interfaces.
  • Refactoring CryptoStonks5000 to use StateFlow to handle the view state of the interface.

The project follows the Clean Architecture and MVVM pattern.

Get the project up and running to make sure everything is on track. After that, it’s time to learn About SharedFlow!

SharedFlow

Before getting into the code, you should at least know what SharedFlow is.

At its core, a SharedFlow is a Flow. But it differs from the standard Flow implementation in two major ways:

  • Even if you don’t call collect() for it, the event will be generated. After all, it is a heat flow implementation.
  • It can have multiple subscribers.

Note that the term used here is “subscriber”, not “collector” as you would see in a normal Flow. This naming change is mainly because SharedFlow will never be completed. In other words, when you call flow.collect () on a SharedFlow, you are not collecting all of its events. Instead, you subscribe to events that are emitted while the subscription exists.

Although this also means that the call to SharedFlow’s flow.collect () will not complete properly, the subscription can still be cancelled. As you might expect, this cancellation happens by canceling coroutine.

Note: Flow truncation operators such as flow.take (count: Int) can force completion of a SharedFlow.

With that said, it’s time to code.

Handling Shared Events

You mock a fake price notification system to mimic changes in the virtual currency’s value. It has to be a fake, because the real thing is too unstable.

Users should be aware of these changes, no matter what interface they are on. To make this possible, you will create a SharedFlow in the ViewModel shared by all interfaces.

In the Demo, find and open CoinsSharedViewModel.kt.

To start, you need to know how to create a SharedFlow. Well, today is your lucky day, because you’ll be creating two in a row, adding this code to the top of the class.

private val _sharedViewEffects = MutableSharedFlow<SharedViewEffects>() / / 1

val sharedViewEffects = _sharedViewEffects.asSharedFlow() / / 2
Copy the code

In this code.

  • You call MutableSharedFlow to create a MutableSharedFlow that emits events of the type SharedViewEffects. This is a simple Sealed Class to simulate possible events. Note that this is a private property. You’ll use this internally to emit events while simultaneously exposing an immutable SharedFlow, making them visible externally (this is a common technique you’ve probably seen in LiveData as well).
  • You create the above common immutable SharedFlow by calling asSharedFlow() on mutable SharedFlow. In this way, immutable public properties always reflect mutable private property values.

Having these two properties is a good practice, not only giving you the freedom to internally produce anything you want via _sharedViewEffects, but also allowing external code to react to these events only by subscribing to sharedViewEffects. As a result, the caller has no power to change the content of SharedFlow, which is a powerful clever separation of design and responsibility that avoids mutational errors.

Event Emission With SharedFlow

There you have your Flow. Now, you need to use them to produce something — a change in price. The CoinsSharedViewModel calls getPriceVariations() in its init block, but the method doesn’t do anything yet.

Add the following code to getPriceVariations().

viewModelScope.launch { / / 1
  for (i in 1.100.) { / / 2
    delay(5000) / / 3
    _sharedViewEffects.emit(SharedViewEffects.PriceVariation(i)) / / 4}}Copy the code

This code does several different things.

  • Start a Coroutine.
  • Execute a for loop from 1 to 100.
  • Delay () is used to check if the coroutine is cancelled, so if it is cancelled, it will stop the loop.
  • Call emit on mutable SharedFlow and pass it an instance of PriceVariation, which is an event of SharedViewEffects. Emit (value: T) is one of the two event production methods that you can invoke on SharedFlow. Another method is to use tryEmit(value: T).

The difference between the two is that emit is a suspended function and tryEmit is not. This small difference leads to a huge behavioral difference between the two methods. To explain this, you need to delve into Replay Cache and buffering for SharedFlow. Fasten your seat belt.

Replay and Buffering

MutableSharedFlow() takes three arguments.

public fun <T> MutableSharedFlow(
  replay: Int = 0./ / 1
  extraBufferCapacity: Int = 0./ / 2
  onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND / / 3
): MutableSharedFlow<T>
Copy the code

Here’s what they’re used for.

  • Replay: The number of values replayed to a new subscriber. It cannot be negative; it defaults to zero.
  • ExtraBufferCapacity: Number of buffered values. It cannot be negative and defaults to zero. This value, plus the sum of replays, constitutes the total buffer size for SharedFlow.
  • OnBufferOverflow: The behavior to take when a buffer overflow is reached. It can have three values: bufferoverflow.suspend, bufferoverflow. DROP_OLDEST, or bufferoverflow.drop_latest. Its default value is bufferoverflow.suspend.

Default Behavior

This can get hard to understand, so here’s a short animation that shows the possible behavior with SharedFlow built with default values. Assume that the SharedFlow uses emit(value: T).

One step at a time.

  • This SharedFlow has three events and two subscribers. The first event was sent when there were no subscribers yet, so it would be lost forever.
  • When SharedFlow issues the second event, it already has a subscriber who gets the event.
  • Before reaching the third event, another subscriber appears, but the first subscriber is suspended and remains so until the event is retrieved. This means that emit() will not be able to pass the third event to that subscriber. When this happens, SharedFlow has the choice of either buffering the event and sending it to suspend subscribers when it resumes, or causing a buffer overflow if there are not enough buffers left for the event.
  • In this case, the total buffer is zero -replay+extraBufferCapacity. In other words, a buffer overflow. Because onBufferOverflow is using bufferoverflow. SUSPEND, Flow will SUSPEND until it can pass the event to all subscribers.
  • When the subscriber resumes, the Flow resumes, passing the event to all subscribers and continuing its work.

Note: The SharedFlow specification forbids you from using anything other than onBufferOverflow = bufferoverflow.suspend when the total buffer value is zero. Since tryEmit(value: T) does not pause, it will not work if you use it with the default replay and extraBufferCapacity values. In other words, the only way to emit events with tryEmit(value: T) is to have at least one total buffer.

With Replay

Well, that’s not so bad. But what happens if you have a buffer? Here is an example where replay=1.

Let’s break it down.

  • SharedFlow is no longer paused when it reaches the first event without any active subscribers. Since replay=1, the total buffer size is now 1. Therefore, the Flow buffers the first event and continues.
  • When it reaches the second event, there is no more space in the buffer, so it suspends.
  • Flow remains suspend until the subscriber resumes. Once it resumes, it gets the buffered first event, and the latest second event. SharedFlow is restored, but the first event is gone forever, because the second event now has a place in the replay buffer.
  • Before reaching the third event, a new subscriber appears. It also gets a copy of the latest event due to replay.
  • When the stream finally reaches the third event, both subscribers get a copy of it.
  • SharedFlow buffers this third event while discarding the previous one. Later, when the third subscriber appeared, it also got a copy of the third event.

With extraBufferCapacity and onBufferOverflow

This process is similar to extraBufferCapacity, but without the replay-like behavior. The third example shows a SharedFlow, extraBufferCapacity = 1, onBufferOverflow = bufferoverflow.drop_simple scenario.

In this case.

  • This behavior is the same as in the first example. With a suspend subscriber and a total buffer size of 1, SharedFlow buffers the first event.
  • The different behavior begins with the launch of the second event. Since onBufferOverflow = bufferoverflow.drop_oldest, SharedFlow abandons the first event, buffers the second and continues. Also, notice that the second subscriber does not get a copy of the buffered event. Remember, this SharedFlow has extraBufferCapacity = 1, but replay = 0.
  • The Flow eventually reaches the third event, which the active user receives. The Flow then buffers the event, abandoning the previous event.
  • Shortly thereafter, the suspend user resumes, triggering SharedFlow to issue buffered events to it and clean up the buffer.

Subscribing to Event Emissions

Ok, so that’s it, good job you now know how to create a SharedFlow and customize its behavior. Now there’s only one thing left to do, and that’s subscribe to SharedFlow.

In code, go to the CoinHistory package in Demo and open coinhistoryFragment.kt. At the top of the class, the shared ViewModel is declared and initialized.

private val sharedViewModel: CoinsSharedViewModel by activityViewModels { CoinsSharedViewModelFactory }
Copy the code

Since you want SharedFlow to generate data on any interface, you can’t bind the ViewModel to that particular Fragment. Instead, you want to bind it to your Activity so that it survives when you move from one Fragment to another. This is why the by activityViewModels delegate is used in the code. As for CoinsSharedViewModelFactory, don’t worry. Each ViewModel factory in the application is ready to inject any dependencies correctly.

Collecting the SharedFlow

Now that you have the shared ViewModel, you can use it. Find subscribeToSharedViewEffects (). Subscribe to SharedFlow here by adding the following code.

viewLifecycleOwner.lifecycleScope.launchWhenStarted { / / 1
  sharedViewModel.sharedViewEffects.collect { / / 2
    when (it) {
      / / 3
      is SharedViewEffects.PriceVariation -> notifyOfPriceVariation(it.variation)
    }
  }
}
Copy the code

This code has a few important details.

  • Coroutine scope is View, not Fragment. This ensures that the Coroutine is alive only as long as the View is alive, even if the Fragment outlives it. The code creates the Coroutine with launchWhenStarted rather than the most common launch. Thus, the coroutine is STARTED only when the life cycle is at least STARTED, paused when it is at least STOPPED, and cancelled when the coroutine scope is cancelled. Using launch here leads to a potential crash, because the Coroutine continues to process events even in the background.
  • As you can see, subscribing to a SharedFlow is the same as subscribing to a regular stream. The code calls Collect () on SharedFlow to subscribe to new events.
  • Subscribers react to SharedFlow events.

Keep in mind at all times that even with launchWhenStarted, SharedFlow will continue to generate events without subscribers. Therefore, you always need to consider whether you are wasting resources. In this case, the event-generated code is harmless, but things can get serious, especially if you use something like shareIn to turn cold flow into hot flow.

Note: Turning cold flow into hot flow is outside the scope of this tutorial — to be honest, it deserves a separate tutorial. If you are interested, check out the last section of this tutorial for references on this topic.

Applying the Stream Data to the View

Back in the code, you can see that notifyOfPriceVariation() does not yet exist. Add that to the list.

private fun notifyOfPriceVariation(variation: Int) {
  val message = getString(R.string.price_variation_message, variation)
  showSnackbar(message)
}
Copy the code

Simple and easy. Build and run the application. Now, when you enter the virtual currency history screen, you will see some periodic Snackbar information at the bottom. However, SharedFlow doesn’t start generating data until you enter that interface. Even if the CoinsSharedViewModel instance is bound to the Activity, it is only created the first time you access the virtual currency history screen.

And you want all the interfaces to know what the price is going to be, so that’s not ideal. To fix this, make the exact same change in CoinListFragment.

  • Create CoinSharedViewModel instance in the same way.
  • Add code to subscribeToSharedViewEffects ().
  • Create notifyOfPriceVariation ().

Build and run the application. Now you’ll also see periodic Snackbar messages in CoinListFragment. When you switch screens, you’ll see that messages always show the next event, not the previous one. MutableSharedFlow() in CoinsSharedViewModel is using the default argument. But feel free to play around and see how it affects SharedFlow!

SharedFlow and Channels

Like SharedFlow, Channels stands for heat flow. But that doesn’t mean SharedFlow will replace Channels API– at least not entirely.

SharedFlow is designed to completely replace BroadcastChannel. SharedFlow is not only simpler and faster to use, but also has more features than BroadcastChannel. But keep in mind that other elements in the Channels API can and should still be used when it makes sense.

StateFlow

A StateFlow is structured like a SharedFlow. This is because StateFlow is simply a specialized subclass of SharedFlow. In fact, you can create a SharedFlow that behaves exactly like a StateFlow.

val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(InitialState()) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
Copy the code

The code above creates a SharedFlow that only sends the latest value to any new subscribers. Because of the distinctUntilChanged at the bottom, it will only emit any value if it differs from the previous value. This is exactly what StateFlow does, which makes it ideal for maintaining and handling state.

Handling App State

But there is an easier way to create StateFlow that you can use now. Coinlist package, opened in the inside CoinListFragmentViewModel. Kt. This simple ViewModel uses LiveData to expose a view state class to CoinListFragment. The state class itself is fairly simple, with default values that match the initial view state.

data class CoinListFragmentViewState(
    val loading: Boolean = true.val coins: List<UiCoin> = emptyList()
)
Copy the code

The Fragment then updates the view with the current state by observing the LiveData.

// Code in CoinListFragment.kt
private fun observeViewStateUpdates(adapter: CoinAdapter) {
  viewModel.viewState.observe(viewLifecycleOwner) { updateUi(it, adapter) }
}
Copy the code

Start the refactoring by changing MutableLiveData to MutableStateFlow. So in the CoinListFragmentViewModel from.

private val _viewState = MutableLiveData(CoinListFragmentViewState())
Copy the code

To:

private val _viewState = MutableStateFlow(CoinListFragmentViewState())
Copy the code

Be sure to include the necessary imports for MutableStateFlow. This is how you create a mutable StateFlow. Unlike SharedFlow, StateFlow requires an initial value, or in other words, an initial state. But because StateFlow is a concrete implementation of SharedFlow, you can’t customize things like replay or extraBufferCapacity. However, the general rules and constraints of SharedFlow still apply.

Next, update the immutable LiveData accordingly.

val viewState: LiveData<CoinListFragmentViewState> get() = _viewState
Copy the code

To:

val viewState: StateFlow<CoinListFragmentViewState> get() = _viewState
Copy the code

Of course, you can do the same.

val viewState = _viewState.asStateFlow()
Copy the code

Add the import of StateFlow. Whether SharedFlow or StateFlow, you can create an immutable instance with these two options. The advantage of using asStateFlow() or asSharedFlow() is that you get the extra security behavior of explicitly creating an immutable version of the flow. This avoids the mistake of creating another mutable version.

Event Emission With StateFlow

One notable difference between SharedFlow and StateFlow is the way events are generated. You can still use emit and tryEmit in StateFlow, but… Don’t do that. Instead, you should do this.

mutableState.value = newState
Copy the code

The reason is that updates to value are always mixed together, which means that even if you update faster than your subscribers can consume, they only get the latest value. One thing to keep in mind is that whatever you assign to value must be a completely different object than the one before it. For example, take this code.

data class State(
  var name: String = "".var age: Int = -1
)

val mutableState = MutableStateFlow<State>(State())

// ...

// newState and mutableState.value will reference the same object
val newState = mutableState.value 

// Reference is the same, so this is also changing mutableState.value!
newState.name = "Marc"

mutableState.value = newState
Copy the code

In this case, StateFlow will not emit a new value. Because the referenced object is the same, Flow will assume that it is in the same state.

To make this work, you need to use immutable objects. For example.

data class State(
  val name: String = "".val age: Int = -1
)

val mutableState = MutableStateFlow<State>(State())

// ...

mutableState.value = State(name = "Marc")
Copy the code

As a result, StateFlow will issue status updates correctly. Immutability saves the world again.

Back in the code, the nice thing about replacing LiveData with StateFlow is that they both use a property called value, so nothing changes there.

In CoinListFragmentViewModel requestCoinList () method, there is one last change to do. You can now update the initial if condition to.

if (viewState.value.coins.isNotEmpty()) return
Copy the code

You no longer need “?” Because the value can’t be empty. Also, you reverse the condition by using isNotEmpty() instead of isNullOrEmpty() and get rid of it at the beginning! . This makes the code easier to read.

If you try to build this application, you will get an error on CoinListFragment indicating that there is an unresolved reference to observe. StateFlow has no observation methods, so you’ll need to refactor it as well.

Subscribing to State Updates

Open the CoinListFragment. Kt. Find observeViewStateUpdates() and update it to.

private fun observeViewStateUpdates(adapter: CoinAdapter) {
  viewLifecycleOwner.lifecycleScope.launchWhenStarted {
    viewModel.viewState.collect { updateUi(it, adapter) }
  }
}
Copy the code

This code looks a lot like what you would do with SharedFlow, and it has the same logic. However, you may be concerned about StateFlow generating values while your application is in the background. But you don’t need to worry. Indeed, because its scope is viewModelScope, as long as the ViewModel exists, it will produce data even if there are no subscribers. However, StateFlow’s data production is a lightweight operation that simply updates the values and notifies all subscribers. In addition, you might actually want your application to show you the latest UI state when it comes to the foreground.

Build and run the application. Everything should work as before, because you’ve just refactored the code. Good job with StateFlow!

StateFlow and Channels

Like SharedFlow can completely replace BroadcastChannel StateFlow can completely replace the ConflatedBroadcastChannel. There are several reasons for this. StateFlow than ConflatedBroadcastChannel simpler and more efficient. It also has a better distinction between MutableStateFlow and StateFlow between variability and immutability.

Hot Flows, RxJava and LiveData

You now know how SharedFlow and StateFlow work. But are they useful on Android?

While they may not bring anything “new”, they offer more direct and effective alternatives. For example, where you use RxJava’s PublishSubject, you can use SharedFlow. Or where you use BehaviorSubject, you can use StateFlow. In fact, StateFlow can even easily replace LiveData if hot Event Emission is not a problem.

Note: You can also convert SharedFlow and StateFlow objects to LiveData via Lifecycle – LiveData-ktx. The library provides an extension method, asLiveData(), that allows you to transform a Flow and expose it asLiveData for use in your view. For more details, see the StateFlow, Flow, and LiveData sections in the Android Developer StateFlow and SharedFlow article.

So, in simpler terms.

  • If you have some kind of state management, you can use StateFlow.
  • As long as you have some stream of events going on, it’s not a problem if events are not processed by all possible subscribers, or if past events may not be processed at all, you can use SharedFlow.

Translation from the original: www.raywenderlich.com/22030171-re…