Producers in asynchronous data streams can produce too much data that consumers don’t need, so stream limiting comes in handy. There are some common traffic limiting scenarios in App development, such as search box anti-shake, click event anti-shake, prevent excessive refresh. This article will take these three scenes as clues to explore how to achieve and behind the principle
The basics of Flow are essential to reading this article. About these details can click on Kotlin asynchronous | Flow scenarios and principle, is quoted as follows:
- An asynchronous data flow can be understood as data generated sequentially on a timeline and can be used to express multiple continuous asynchronous processes.
- Asynchronous data flows can also be understood using a “producer/consumer” model, as if there is a pipeline between the producer and the consumer, with the producer inserting data from one end of the pipeline and the consumer fetching data from the other. Because of the pipeline, the data is in order, on a first-in, first-out basis.
- In the Kotlin
suspend
Method is used to express an asynchronous procedure, andFlow
Used to express multiple consecutive asynchronous processes.Flow
Is the cold flow, the cold flow does not emit data until the moment it is collected, so the cold flow is “declarative”. - when
Flow
The moment the data is collected, the data is produced and sent out through the stream collectorFlowCollector
Pass it on to the consumer. Streams and stream collectors are paired concepts. A stream is a set of data that is produced sequentially by transmitting data through a stream collector, which here acts as a stream data container (although it does not hold any data) and defines how the data is passed to the consumer. - An intermediate consumer can be inserted between a producer and a consumer in an asynchronous data flow. The intermediate consumer establishes an interception and forward mechanism on the stream: a new downstream stream that produces data by collecting upstream data and forwarding it to a lambda with the ability to transmit data. A stream with multiple intermediate consumers is like a “nesting doll”, with the downstream stream outside the upstream stream. The intermediate consumer intercepts the original data in this way, and can make any changes to it before forwarding it to the downstream consumer.
- All consumers who can trigger the action of collecting data are called end consumers, which are like sparks lighting firecrackers, causing the flow of intermediate consumer dolls to be collected one by one from the outside to the inside (from the downstream to the upstream) and finally transmitted to the original flow, triggering the emission of data.
- By default, production and consumption of data in a stream occur in the same thread. But it can be passed
flowOn()
Changing the thread of execution of the upstream stream does not affect the thread of execution of the downstream stream. Flow
In the production and consumption of data operations are wrapped in the use of suspend decorated lambdas, with coroutines can easily achieve asynchronous production, asynchronous consumption.
Search box stabilization
“Type in the search box, then click the search button, after a period of wait, search results are presented in a list.” This is how apps used to search a long time ago.
Now the search experience is much better, you don’t need to manually click the search button, the search is automatically triggered after you enter the content.
To do this, listen for changes in the input field:
// Build the listener
val textWatcher = object : android.text.TextWatcher {
override fun afterTextChanged(s: Editable?). {}
override fun beforeTextChanged(text: CharSequence? ,start:Int,count: Int,after: Int) {}
override fun onTextChanged(text: CharSequence? , start:Int, before: Int, count: Int) {
search(text.toString())
}
}
// Set the input box content listener
editText.addTextChangedListener(textWatcher)
// Access the network to search
fun search(key: String) {}
Copy the code
This implementation has the disadvantage of making multiple invalid network visits. For example, when searching for “kotlin Flow”, onTextChanged() is called back 10 times, triggering 10 network requests, and only the last one is valid.
It’s also easy to think of optimizations that request only when the user has stopped typing. But there is no such callback to notify the business layer that the user has stopped typing…
You can only set a timeout, that is, how long the user has not entered something before deciding to stop typing.
However, it is quite complicated to implement: the timeout countdown must be started after each change of input box content. If no new change of input box content occurs when the countdown returns to zero, the current input box content will be used to initiate a request, otherwise the countdown will reset and restart the countdown.
In the requirements iteration, would there be time to implement such a complex little feature?
Kotlin’s Flow encapsulates this functionality for us.
Re-understand the above scenario in terms of streams: the input box is the producer of stream data, and every time its content changes, a new data is produced on the stream. But not every piece of data needs to be consumed, so it has to be “stream-limited”, which means that all data that is emitted at short intervals is discarded until there is no more data for some time after it has been produced.
Kotlin has a number of predefined stream limiting methods, and debounce() is a good fit for the current scenario. To use debounce(), we first convert the callback to a stream:
// Build the input box text change stream
fun EditText.textChangeFlow(a): Flow<Editable> = callbackFlow {
// Build the input box listener
val watcher = object : TextWatcher {
override fun afterTextChanged(s: Editable?). {}
override fun beforeTextChanged( s: CharSequence? , start:Int, count: Int, after: Int ){}// Emit data to the stream after the text has changed
override fun onTextChanged( s: CharSequence? , start:Int, before: Int, count: Int ){ s? .let { offer(it) } } } addTextChangedListener(watcher)// Set the input box listener
awaitClose { removeTextChangedListener(watcher) } // Block to keep the stream running
}
Copy the code
Extended a method for EditText that builds an input field text stream.
CallbackFlow {} is a predefined top-level method for organizing callbacks into streams. Just build the callback instance internally and register it, and then call offer() in the callback method for the production data to emit the data. In the current scenario, each text change in the input box is transmitted as stream data.
The last line in callbackFlow {lambda}, awaitClose {}, is essential because it blocks the current coroutine and keeps the stream alive waiting for data, otherwise it will be closed as soon as the lambda completes.
Then you can use it like this:
editText.textChangeFlow() // Build the input box text change stream
.filter { it.isNotEmpty() } // Filter empty content to avoid invalid network requests
.debounce(300) / / 300 ms image stabilization
.flatMapLatest { searchFlow(it.toString()) } // The new search overrides the old search
.flowOn(Dispatchers.IO) // Let the search be performed in an asynchronous thread
.onEach { updateUi(it) } // Get the search results and update the interface
.launchIn(mainScope) // Collect search results on the main thread
// Update the interface
fun updateUi(it: List<String>) {}
// Access the network to search
suspend fun search(key: String): List<String> {}
// Convert the search keywords into a stream of search results
fun searchFlow(key: String) = flow { emit(search(key)) }
Copy the code
Where filter() is the intermediate consumer of the stream:
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) - >Boolean): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
Copy the code
Filter () uses transform() to build a downstream stream that collects upstream data and filters it through the predicate. Only data that meets the criteria will be emitted. A detailed explanation about the transform () can click on Kotlin advanced | the use of asynchronous data stream Flow scenarios.
FlatMapLatest () is also the intermediate consumer, and a flatMap translates a piece of data from an upstream stream into a new stream, in this case a key, through a network request, into a search result Flow
>. Lateest means that if a new search request comes in before the last one has been returned, cancel it and always show the search results with the latest input.
The source of flatMapLatest() is as follows:
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) - >Flow<R>): Flow<R> =
transformLatest { emitAll(transform(it)) }
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>. (value: T) - >Unit): Flow<R> =
ChannelFlowTransformLatest(transform, this)
internal class ChannelFlowTransformLatest<T, R>(
private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> =
ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)
override suspend fun flowCollect(collector: FlowCollector<R>) {
assert { collector is SendingCollector }
flowScope {
var previousFlow: Job? = null
// Collect upstream data
flow.collect { value ->
// 1. If new data arrives, cancel the previous onepreviousFlow? .apply { cancel(ChildCancelledException()) join() }// 2. Start the coroutine to process the current data
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
collector.transform(value)
}
}
}
}
}
Copy the code
When data is collected, a new coroutine is started each time to perform the data transformation operation, and the coroutine’s Job is recorded, and the last Job is cancelled when the next data arrives.
LaunchIn () in the demo scenario is an end consumer:
// Start the coroutine and collect data in it
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect()
}
// Collect data with an empty collector
public suspend fun Flow< * >.collect(a): Unit = collect(NopCollector)
// The empty collector is a FlowCollector that no longer emits data downstream
internal objectNopCollector : FlowCollector<Any? > {override suspend fun emit(value: Any?). {
// does nothing}}Copy the code
Using launchIn() hides the details of launching the coroutine to collect data internally, so it keeps the external code as a neat chained call. The following two pieces of code are equivalent:
mainScope.launch {
editText.textChangeFlow()
.filter { it.isNotEmpty() }
.debounce(300)
.flatMapLatest { searchFlow(it.toString()) }
.flowOn(Dispatchers.IO)
.collect { updateUi(it) }
}
editText.textChangeFlow()
.filter { it.isNotEmpty() }
.debounce(300)
.flatMapLatest { searchFlow(it.toString()) }
.flowOn(Dispatchers.IO)
.onEach { updateUi(it) }
.launchIn(mainScope)
Copy the code
But since launchIn() no longer sends data downstream, it is generally used in conjunction with onEach {} to complete the consumption of data.
Click on event stabilization
The response logic to a click event in an app is usually a pop-up or a network request.
If you click twice in a row at a very fast speed, two screens will pop up or two network requests will be made.
In order to avoid this situation, we need to do click event stabilization, that is, only respond to the first click event within a certain time interval. This can be done as follows:
val FAST_CLICK_THRSHOLD = 300
fun View.onDebounceClickListener( block: (T) - >Unit ) {
// If it is not a quick click, then respond to the click logic
setOnClickListener { if(! it.isFastClick) block() } }// Determine whether to click quickly
fun View.isFastClick(a): Boolean {
val currentTime = SystemClock.elapsedRealtime()
if (currentTime >= FAST_CLICK_THRSHOLD) {
this.triggerTime = currentTime
return false
} else {
return true}}// Record the last click time
private var View.triggerTime: Long
get() = getTag(R.id.click_trigger) as? Long? :0L
set(value) = setTag(R.id.click_trigger, value)
Copy the code
Made 3 extensions to complete click event stabilization. Save the time of each effective click in the View tag, each click to determine the current time and the last time, if the threshold is exceeded, the click is allowed.
Redefine the scenario in terms of flow: each click event is new data on the stream. To limit the flow of the stream, you emit the first data and then discard all the data immediately following in the time window until the new time window arrives.
Unfortunately, Kotlin doesn’t provide a system-level implementation, but it’s easy to customize one:
fun <T> Flow<T>.throttleFirst(thresholdMillis: Long): Flow<T> = flow {
var lastTime = 0L // The date of the last launch
// Collect data
collect { upstream ->
// The current time
val currentTime = System.currentTimeMillis()
// If the time difference exceeds the threshold, data is sent and the time is recorded
if (currentTime - lastTime > thresholdMillis) {
lastTime = currentTime
emit(upstream)
}
}
Copy the code
ThrottleFirst () builds a downstream flow using flow {} and collects upstream data, only transmitting data when the time difference between the two data times exceeds a threshold.
Then organize the click events into a stream:
fun View.clickFlow(a) = callbackFlow {
setOnClickListener { offer(Unit) }
awaitClose { setOnClickListener(null)}}Copy the code
It can be used like this:
view.clickFlow()
.throttleFirst(300)
.onEach { // Click the event response}
.launchIn(mainScope)
Copy the code
Prevent overrefreshing
Imagine a million-dollar studio with a list of recently added viewers. Each new audience is notified via the onUserIn(UID: String) callback, and the user information needs to be pulled through the UID request network and updated in the audience list.
For a million-level broadcast studio, where hundreds of thousands of viewers may join every second, hundreds or thousands of Internet visits per second would be outrageous.
The current limiting scheme given by the product end: refresh the list every second, and only show the last 5 people who join the broadcast room within this second.
Reframe the scenario in terms of streams: the onUserIn() callback is the producer of the stream data. To do current-limiting, that is, in each fixed time interval, only the last N data is transmitted, and the rest of the data is discarded.
Kotlin provides a system-level implementation sample() :
// Convert the callback to a stream
fun userInFlow(a) = callbackFlow {
val callback = object : UserCallback() {
override fun onUserIn(uid: String) { offer(uid) }
}
setCallback(callback)
awaitClose { setCallback(null)}}// The audience list is limited
userInFlow()
.sample(1000)
.onEach { fetchUser(it) }
.flowOn(Dispatchers.IO)
.onEach { updateAudienceList() }
.launchIn(mainScope)
Copy the code
This implementation makes the same mistake as the previous countdown Flow, see? This will be discussed in detail in subsequent chapters.)
conclusion
Using the idea of asynchronous data flow to understand the following scenarios makes solving the problem easy:
- Search box stabilization: Discard all data that is too short between launches until there is no new data for a period of time after the production of a certain data.
- Click event stabilization: Fires the first data, then jettison all data immediately following it in the time window until a new time window arrives.
- In each fixed time interval, only the last N pieces of data are sent and the rest are discarded.
The above traffic limiting scheme can be distinguished from two dimensions:
- Whether there is a fixed time interval for transmitting data.
- Whether the new data will cause the restart countdown.
Current limiting scheme | Fixed interval | Restart countdown |
---|---|---|
Search box stabilization | false | true |
Click on event stabilization | false | false |
Prevent overrefreshing | true | false |
-
As long as the input is continuous, data is never sent. So there is no fixed time interval for the input box anti – shaking transmitting data. The search box stabilizes and the countdown restarts with each new piece of data.
-
As long as no click event occurs, the data will not be emitted. So there is no fixed time interval at which the click event anti – shaking data is emitted. When the first data is generated, the countdown starts. It will not restart the countdown due to the arrival of subsequent events. In the countdown, all data except the first data are discarded.
-
New data is emitted every fixed interval, regardless of whether there is new data. There is a fixed interval in case of excessive refresh.
Recommended reading
- Kotlin base | entrusted and its application
- Kotlin basic grammar | refused to noise
- Kotlin advanced | not variant, covariant and inverter
- Kotlin combat | after a year, with Kotlin refactoring a custom controls
- Kotlin combat | kill shape with syntactic sugar XML file
- Kotlin base | literal-minded Kotlin set operations
- Kotlin source | magic weapon to reduce the complexity of code
- Why Kotlin coroutines | CoroutineContext designed indexed set? (a)
- Kotlin asynchronous | Flow principles and application scenarios
- Kotlin asynchronous | Flow application scenarios and the principle of current limit