I recently read several articles written by Flow developers on Medium and found them very good. I recommend them to you.
1
Original link: proandroiddev.com/using-lived…
Recently, I’ve been looking for best practices for Kotlin Flow in the MVVM architecture. After I answered this question about LiveData and Flow, I decided to write this article. In this article, I’ll explain how to use Flow and LiveData in MVVM patterns. Then we’ll see how you can change the theme of your application by using Flow.
Sample address: github.com/fgiris/Live…
What is Flow?
A Flow is a reactive Flow in the Coroutines library that can return multiple values from a Suspend function.
Although the use of Flow seems very similar to LiveData, it has many more advantages, such as:
- It is inherently asynchronous, with structured concurrency
- Simply convert data using operators such as map and filter
- Easy to test
How do I use Flow in MVVM
If your application has an MVVM architecture, you usually have a data layer (database, data source, etc.), ViewModel, and View (Fragment or Activity). You might use LiveData to transfer and transform data between these layers. But what is the main purpose of LiveData? Is it designed for data conversion?
LiveData was never designed to be a fully fledged reactive flow builder
— Jose Alcerreca at the Android Dev Summit 2019
Because LiveData is a lifecycle aware component, it is best to use it in the View and ViewModel layers. But what about the data layer? I think the biggest problem with using LiveData at the database level is that all data conversion will be done on the main thread unless you start a Coroutine and work in it. This is why you might prefer to use the Suspend function in the data layer.
Suppose you want to get weather forecast data from the web. Using the Suspend function in your database might look something like this.
class WeatherForecastRepository @Inject constructor() { suspend fun fetchWeatherForecast(): Result<Int> { // Since you can only return one value from suspend function // you have to set data loading before calling fetchWeatherForecast // Fake api call delay(1000) // Return fake success data return Result.Success((0.. 20).random()) } }Copy the code
You can call this function from the ViewModel using the viewModelScope.
class WeatherForecastOneShotViewModel @Inject constructor(
val weatherForecastRepository: WeatherForecastRepository
) : ViewModel() {
private var _weatherForecast = MutableLiveData<Result<Int>>()
val weatherForecast: LiveData<Result<Int>>
get() = _weatherForecast
fun fetchWeatherForecast() {
// Set value as loading
_weatherForecast.value = Result.Loading
viewModelScope.launch {
// Fetch and update weather forecast LiveData
_weatherForecast.value = weatherForecastRepository.fetchWeatherForecast()
}
}
}
Copy the code
This works well for a single request that runs every time it is called. But what about when it comes to capturing the data stream?
This is where Flow comes into play. If you want to get real-time updates from your server, you can use Flow to do so without worrying about resource leaks, as structured concurrency forces you to do so.
Let’s transform our database to return Flow.
class WeatherForecastRepository @Inject constructor() { /** * This methods is used to make one shot request to get * fake weather forecast data */ fun fetchWeatherForecast() = flow { emit(Result.Loading) // Fake api call delay(1000) // Send a random fake weather forecast data emit(Result.Success((0.. 20).random())) } /** * This method is used to get data stream of fake weather * forecast data in real time */ fun fetchWeatherForecastRealTime() = flow { emit(Result.Loading) // Fake data stream while (true) { delay(1000) // Send a random fake weather forecast data emit(Result.Success((0.. 20).random())) } } }Copy the code
We can now return multiple values from a Suspend function. You can convert flows to LiveData in the ViewModel using the asLiveData extension function.
class WeatherForecastOneShotViewModel @Inject constructor(
weatherForecastRepository: WeatherForecastRepository
) : ViewModel() {
private val _weatherForecast = weatherForecastRepository
.fetchWeatherForecast()
.asLiveData(viewModelScope.coroutineContext) // Use viewModel scope for auto cancellation
val weatherForecast: LiveData<Result<Int>>
get() = _weatherForecast
}
Copy the code
This looks similar to using LiveData because there is no data transformation. Let’s look at getting real-time updates from the database.
class WeatherForecastDataStreamViewModel @Inject constructor( weatherForecastRepository: WeatherForecastRepository ) : ViewModel() { private val _weatherForecast = weatherForecastRepository .fetchWeatherForecastRealTime() .map { // Do some heavy operation. This operation will be done in the // scope of this flow collected. In our case it is the scope // passed to asLiveData extension function // This operation will not block the UI delay(1000) it } .asLiveData( // Use Default dispatcher for CPU intensive work and // viewModel scope for auto cancellation when viewModel // is destroyed Dispatchers.Default + viewModelScope.coroutineContext ) val weatherForecast: LiveData<Result<Int>> get() = _weatherForecast }Copy the code
When you get real-time weather forecast data, all data conversions in the Map function will be done asynchronously within the scope of Flow Collect.
Note: If you are not using Flow in your repository, you can implement the same data conversion functionality by using liveData Builder.
private val _weatherForecast = liveData {
val response = weatherForecastRepository.fetchWeatherForecast()
// Do some heavy operation with response
delay(1000)
emit(transformedResponse)
}
Copy the code
Going back to the Flow’s real-time data fetch, we can see that it updates the text fields while observing the data Flow without blocking the UI.
class WeatherForecastDataStreamFragment : DaggerFragment() { ... override fun onActivityCreated(savedInstanceState: Bundle?) { super.onActivityCreated(savedInstanceState) // Obtain viewModel viewModel = ViewModelProviders.of( this, viewModelFactory ).get(WeatherForecastDataStreamViewModel::class.java) // Observe weather forecast data stream viewModel.weatherForecast.observe(viewLifecycleOwner, Observer { when (it) { Result.Loading -> { Toast.makeText(context, "Loading", Toast.LENGTH_SHORT).show() } is Result.Success -> { // Update weather data tvDegree.text = it.data.toString() } Result.Error -> { Toast.makeText(context, "Error", Toast.LENGTH_SHORT).show() } } }) lifecycleScope.launch { while (true) { delay(1000) // Update text tvDegree.text = "Not blocking" } } } }Copy the code
It will look something like this:
Change the theme of your application with Flow
Since Flow can issue real-time updates, we can treat the user’s input as an update and send it through the Flow. To do this, let’s create a topic data source that has a topic channel for broadcasting updates.
class ThemeDataSource @Inject constructor( private val sharedPreferences: SharedPreferences ) { private val themeChannel: ConflatedBroadcastChannel<Theme> by lazy { ConflatedBroadcastChannel<Theme>().also { channel -> // When there is an access to theme channel // get the current theme from shared preferences // and send it to consumers val theme = sharedPreferences.getString( Constants.PREFERENCE_KEY_THEME, null ) ? : Theme.LIGHT.name // Default theme is light channel.offer(Theme.valueOf(theme)) } } @FlowPreview fun getTheme(): Flow<Theme> { return themeChannel.asFlow() } fun setTheme(theme: Theme) { // Save theme to shared preferences sharedPreferences .edit() .putString(Constants.PREFERENCE_KEY_THEME, theme.name) .apply() // Notify consumers themeChannel.offer(theme) } } // Used to change the theme of the app enum class Theme { DARK, LIGHT }Copy the code
As you can see, the themeChannel is not accessed directly from the outside, and the themeChannel is converted to Flow before being sent.
It is better to consume theme updates at the Activity level, because all updates from other fragments can be safely observed.
Let’s get the theme update in the ViewModel.
class MainViewModel @Inject constructor(
private val themeDataSource: ThemeDataSource
) : ViewModel() {
// Whenever there is a change in theme, it will be
// converted to live data
private val _theme: LiveData<Theme> = themeDataSource
.getTheme()
.asLiveData(viewModelScope.coroutineContext)
val theme: LiveData<Theme>
get() = _theme
fun setTheme(theme: Theme) {
themeDataSource.setTheme(theme)
}
}
Copy the code
And you can easily observe this in an Activity.
class MainActivity : DaggerAppCompatActivity() { ... override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) ... observeTheme() } private fun observeTheme() { // Observe and update app theme if any changes happen viewModel.theme.observe(this, Observer { theme -> when (theme) { Theme.LIGHT -> AppCompatDelegate.setDefaultNightMode(AppCompatDelegate.MODE_NIGHT_NO) Theme.DARK -> AppCompatDelegate.setDefaultNightMode(AppCompatDelegate.MODE_NIGHT_YES) } }) } }Copy the code
All that remains is to press the button in the Fragment.
class MainFragment : DaggerFragment() { private lateinit var viewModel: MainViewModel ... override fun onViewCreated(view: View, savedInstanceState: Bundle?) {... btnDarkMode.setOnClickListener { // Enable dark mode viewModel.setTheme(Theme.DARK) } } }Copy the code
To see see! I just changed the theme with Flow.
2
Original link: proandroiddev.com/using-lived…
In Part 1, we saw how to use Flow in the repository layer and how to use Flow and LiveData to change the theme of your application. In this article, we’ll see how to remove LiveData (even MediatorLiveData) and use only Flow in all layers. We’ll also delve into common Flow operations such as map, filter, Transform, and so on. Finally, we will implement an example of a search bar developed by Sean McQuillan in “Fragmented Podcast-187: Examples given in Manuel Vivo and Sean McQuillan’s Coroutines “use channels and flows.
Say 👋 to LiveData
Using LiveData ensures that you don’t give away any resources in the event of destruction by the lifecycle owner. What if I told you that you could almost (and I’ll explain why it’s different later, but almost) get the same benefits with Flow?
Let’s see how we can do this.
The repository
The repository layer remains the same because we are already returning Flow.
/** * This method is used to get data stream of fake weather * forecast data in real time with 1000 ms delay */ fun fetchWeatherForecastRealTime() : Flow<Result<Int>> = flow { // Fake data stream while (true) { delay(1000) // Send a random fake weather forecast data emit(Result.Success((0.. 20).random())) } }Copy the code
ViewModel
We don’t need to convert flows to LiveData with asLiveData, just use flows in the ViewModel.
That was the case before.
class WeatherForecastDataStreamViewModel @Inject constructor( weatherForecastRepository: WeatherForecastRepository ) : ViewModel() { private val _weatherForecast = weatherForecastRepository .fetchWeatherForecastRealTime() .map { // Do some heavy operation. This operation will be done in the // scope of this flow collected. In our case it is the scope // passed to asLiveData extension function // This operation will not block the UI delay(1000) it } .asLiveData( // Use Default dispatcher for CPU intensive work and // viewModel scope for auto cancellation when viewModel // is destroyed Dispatchers.Default + viewModelScope.coroutineContext ) val weatherForecast: LiveData<Result<Int>> get() = _weatherForecast }Copy the code
Just using Flow, it becomes.
class WeatherForecastDataStreamFlowViewModel @Inject constructor(
weatherForecastRepository: WeatherForecastRepository
) : ViewModel() {
private val _weatherForecast = weatherForecastRepository
.fetchWeatherForecastRealTime()
val weatherForecast: Flow<Result<Int>>
get() = _weatherForecast
}
Copy the code
But wait. The map process is missing, so let’s add it to convert Celsius to Fahrenheit when drawing the map.
private val _weatherForecast = weatherForecastRepository .fetchWeatherForecastRealTime() .map { // Do some heavy mapping delay(500) // Let's add an additional mapping to convert // celsius degree to Fahrenheit if (it is Result.Success) { val fahrenheitDegree = convertCelsiusToFahrenheit(it.data) Result.Success(fahrenheitDegree) } else it // Do nothing if result is loading or error } /** * This function converts given [celsius] to Fahrenheit. * * Fahrenheit degree = Celsius degree * 9 / 5 + 32 * * @return Fahrenheit integer for [celsius] */ private fun convertCelsiusToFahrenheit(celsius: Int) = celsius * 9 / 5 + 32Copy the code
You might want to show loading in the user interface, so onStart is the perfect place to do it.
private val _weatherForecast = weatherForecastRepository .fetchWeatherForecastRealTime() .onStart { emit(Result.Loading) } .map { ... }Copy the code
If you want to filter the numbers, go ahead. You have filter operators.
private val _weatherForecast = weatherForecastRepository
.fetchWeatherForecastRealTime()
.onStart { ... }
.filter {
// There could be millions of data when filtering
// Do some filtering
delay(2000)
// Let's add an additional filtering to take only
// data which is less than 10
if (it is Result.Success) {
it.data < 10
} else true // Do nothing if result is loading or error
}
.map { ... }
Copy the code
You can also transform data using the transform operator, which gives you the flexibility to emit the information you want to a single value.
private val _weatherForecast = weatherForecastRepository .fetchWeatherForecastRealTime() .onStart { ... } .filter { ... } .map { ... } .transform { // Let's send only even numbers if (it is Result.Success && it.data % 2 == 0) { val evenDegree = it.data emit(Result.Success(evenDegree)) // You can call emit as many as you want in transform // This makes transform different from filter operator } else emit(it) // Do nothing if result is loading or error }Copy the code
Since Flow is sequential, collecting the total execution time of a value is the sum of the execution time of all operators. If you have a long-running operator, you can use buffer so that execution of all operators up to buffer will be handled in a different coroutine rather than on Flow Collect in a coroutine. This makes the overall execution faster.
private val _weatherForecast = weatherForecastRepository
.fetchWeatherForecastRealTime()
.onStart { ... }
.filter { ... }
// onStart and filter will be executed on a different
// coroutine than this flow is collected
.buffer()
// The following map and transform will be executed on the same
// coroutine which this flow is collected
.map { ... }
.transform { ... }
Copy the code
What if you don’t want to collect the same value multiple times? Then you can use the distinctUntilChanged operator, which only sends values that are different from the previous value.
private val _weatherForecast = weatherForecastRepository
.fetchWeatherForecastRealTime()
.onStart { ... }
.distinctUntilChanged()
.filter { ... }
.buffer()
.map { ... }
.transform { ... }
Copy the code
For example, you just want to cache the modified data before displaying it in the user interface. You can use the onEach operator to do the work for each value.
private val _weatherForecast = weatherForecastRepository
.fetchWeatherForecastRealTime()
.onStart { ... }
.distinctUntilChanged()
.filter { ... }
.buffer()
.map { ... }
.transform { ... }
.onEach {
// Do something with the modified data. For instance
// save the modified data to cache
println("$it has been modified and reached until onEach operator")
}
Copy the code
If you are doing some heavy lifting with all the operators, you can simply change the execution environment of the entire operator by using the flowOn operator.
private val _weatherForecast = weatherForecastRepository
.fetchWeatherForecastRealTime()
.onStart { ... }
.distinctUntilChanged()
.filter { ... }
.buffer()
.map { ... }
.transform { ... }
.onEach { ... }
.flowOn(Dispatchers.Default) // Changes the context of flow
Copy the code
What about errors? Simply use the catch operator to catch any errors in the downstream stream.
private val _weatherForecast = weatherForecastRepository
.fetchWeatherForecastRealTime()
.onStart { ... }
.distinctUntilChanged()
.filter { ... }
.buffer()
.map { ... }
.transform { ... }
.onEach { ... }
.flowOn(Dispatchers.Default)
.catch { throwable ->
// Catch exceptions in all down stream flow
// Any error occurs after this catch operator
// will not be caught here
println(throwable)
}
Copy the code
What if we have another stream to merge with the _weatherForecast stream? (You might think this is a MediatorLiveData with multiple LiveData sources.) You can merge any amount of traffic using the merge function.
private val _weatherForecast = weatherForecastRepository .fetchWeatherForecastRealTime() .onStart { ... } .distinctUntilChanged() .filter { ... } .buffer() .map { ... } .transform { ... } .onEach { ... } .flowOn(Dispatchers.Default) .catch { ... } private val _weatherForecastOtherDataSource = weatherForecastRepository .fetchWeatherForecastRealTimeOtherDataSource() // Merge flows when consumer gets val weatherForecast: Flow<Result<Int>> get() = merge(_weatherForecast, _weatherForecastOtherDataSource)Copy the code
Finally, our ViewModel looks something like this.
@ExperimentalCoroutinesApi class WeatherForecastDataStreamFlowViewModel @Inject constructor( weatherForecastRepository: WeatherForecastRepository ) : ViewModel() { private val _weatherForecastOtherDataSource = weatherForecastRepository .fetchWeatherForecastRealTimeOtherDataSource() private val _weatherForecast = weatherForecastRepository .fetchWeatherForecastRealTime() .onStart { emit(Result.Loading) } .distinctUntilChanged() .filter { // There could be millions of data when filtering // Do some filtering delay(2000) // Let's add an additional filtering to take only // data which is less than 10 if (it is Result.Success) { it.data < 10 } else true // Do nothing if result is loading or error } .buffer() .map { // Do some heavy mapping delay(500) // Let's add an additional mapping to convert // celsius degree to Fahrenheit if (it is Result.Success) { val fahrenheitDegree = convertCelsiusToFahrenheit(it.data) Result.Success(fahrenheitDegree) } else it // Do nothing if result is loading or error } .transform { // Let's send only even numbers if (it is Result.Success && it.data % 2 == 0) { val evenDegree = it.data emit(Result.Success(evenDegree)) } else emit(it) // Do nothing if result is loading or error } .onEach { // Do something with the modified data. For instance // save the modified data to cache println("$it has modified and reached until onEach operator") } .flowOn(Dispatchers.Default) // Changes the context of flow .catch { throwable -> // Catch exceptions in all down stream flow // Any error occurs after this catch operator // will not be caught here println(throwable) } // Merge flows when consumer gets val weatherForecast: Flow<Result<Int>> get() = merge(_weatherForecast, _weatherForecastOtherDataSource) /** * This function converts given [celsius] to Fahrenheit. * * Fahrenheit degree = Celsius degree * 9 / 5 + 32 * * @return Fahrenheit integer for [celsius] */ private fun convertCelsiusToFahrenheit(celsius: Int) = celsius * 9 / 5 + 32 }Copy the code
The only thing left is to implement collect on flows in fragments.
class WeatherForecastDataStreamFlowFragment : DaggerFragment() {
...
override fun onActivityCreated(savedInstanceState: Bundle?) {
super.onActivityCreated(savedInstanceState)
// Obtain viewModel
viewModel = ViewModelProviders.of(
this,
viewModelFactory
).get(WeatherForecastDataStreamFlowViewModel::class.java)
// Consume data when fragment is started
lifecycleScope.launchWhenStarted {
// Since collect is a suspend function it needs to be called
// from a coroutine scope
viewModel.weatherForecast.collect {
when (it) {
Result.Loading -> {
Toast.makeText(context, "Loading", Toast.LENGTH_SHORT).show()
}
is Result.Success -> {
// Update weather data
tvDegree.text = it.data.toString()
}
Result.Error -> {
Toast.makeText(context, "Error", Toast.LENGTH_SHORT).show()
}
}
}
}
}
}
Copy the code
These are just some of the Flow operators. You can find a list of the entire operators here.
Kotlin. Making. IO/kotlinx cor…
Note: Removing LiveData adds additional work for configuration changes. To retain configuration changes, you need to cache the most recent values. You can see how the Dropbox repository handles caching here.
Search bar using Channel and Flow
In this podcast, Sean McQuillan gives an example of how to create a search bar using channels and flows. The idea is to have a search bar with a filter list. Every time the user types something into the search bar, the list is filtered out by the text in the search bar. This is done by saving text values in a channel and observing changes in traffic through that channel.
To illustrate this example, let’s have a list of cities and a search bar. In the end, it’s going to look something like this.
We’re going to have an EditText in the Fragment. Whenever the text is updated, we will send it to a channel stored in the ViewModel.
etCity.doAfterTextChanged {
val key = it.toString()
// Set loading indicator
pbLoading.show()
// Offer the current text to channel
viewModel.cityFilterChannel.offer(key)
}
Copy the code
When the channel is updated to the latest value, we filter the cities and send the list to the subscribers.
class SearchCityViewModel @Inject constructor() : ViewModel() { val cityList = listOf( "Los Angeles", "Chicago", "Indianapolis", "Phoenix", "Houston", "Denver", "Las Vegas", "Philadelphia", "Portland", "Seattle" ) // Channel to hold the text value inside search box val cityFilterChannel = ConflatedBroadcastChannel<String>() // Flow which observes channel and sends filtered list // whenever there is a update in the channel. This is // observed in UI to get filtered result val cityFilterFlow: Flow<List<String>> = cityFilterChannel .asFlow() .map { // Filter cities with new value val filteredCities = filterCities(it) // Do some heavy work delay(500) // Return the filtered list filteredCities } override fun onCleared() { super.onCleared() // Close the channel when ViewModel is destroyed cityFilterChannel.close() } /** * This function filters [cityList] if a city contains * the given [key]. If key is an empty string then this * function does not do any filtering. * * @param key Key to filter out the list * * @return List of cities containing the [key] */ private fun filterCities(key: String): List<String> { return cityList.filter { it.contains(key) } } }Copy the code
Then, just watch the changes in the Fragment.
lifecycleScope.launchWhenStarted {
viewModel.cityFilterFlow.collect { filteredCities ->
// Hide the progress bar
pbLoading.hide()
// Set filtered items
adapter.setItems(filteredCities)
}
}
Copy the code
Ok, we just implemented a search and filtering mechanism using channels and streams 👊.
3
Proandroiddev.com/using-lived…
The third article is mainly about the test of Flow. I believe this article is almost useless in China, so interested friends can read it by themselves.
I would like to recommend my website xuyisheng. Top/focusing on Android-Kotlin-flutter welcome you to visit