If you’re a third-party library author, you might want to use Coroutines and flows to make your Java callback-based libraries more kotlin-based. On the other hand, if you’re an API consumer, you might prefer to plug in a Coroutines style API to make it kotlin-friendly and make the development logic more linear.

Today to see how to use a Coroutine and Flow under the simplified API, and how to use suspendCancellableCoroutine callbackFlow API and build your own coroutines style adapter.

Callbacks

Callbacks are a very common solution for asynchronous communication. In fact, we use them as a solution for the Java programming language in most Java scenarios. However, Callbacks do have some drawbacks: this design leads to nested Callbacks, which can lead to unintelligible code, and exception handling can be complicated.

In Kotlin, you can use Coroutines to simplify Callbacks, but to do so you need to build your own adapter to convert old Callbacks into Kotlin-style Coroutines.

Build the Adapter

In coroutines, Kotlin provides suspendCancellableCoroutine to fit One – shot back, at the same time provides callbackFlow callback to fit the data flow scenarios.

In the following scenario, a simple Callbacks example is used to demonstrate this transformation.

One-shot async calls

Suppose we have a “netapi.getData” function that returns a Data Callback. In the coroutine scenario, we want it to return a suspend function.

So, we design an extension function for NetAPI that returns the suspend function for Location, as shown below.

suspend fun NetAPI.awaitGetData(): Data
Copy the code

Since this is a One – shot of the asynchronous operation, we can use suspendCancellableCoroutine function, suspendCancellableCoroutine passed as a parameter to its execution code block, and then to suspend the execution of the current Coroutine, And wait for the signal to continue. Coroutine resumes execution when the resume or resumeWithException method in its Continuation object is called.

// NetAPI extension function to return Data suspend fun netapi.awaitGetData (): Data = / / can create a cancelled suspendCancellableCoroutine suspendCancellableCoroutine < Data > {continuation - > val callback =  object : NetCallback { override fun success(data: Data) {// Resume coroutine returns Data continuation. Resume (Data)} Override fun error(e: String) { // Resume the coroutine continuation.resumeWithException(e) } } addListener(callback) // End suspendCancellableCoroutine block execution until the call in at any time adjust the continuation parameter}Copy the code

Note that: Coroutines repository can find suspendCancellableCoroutine version (namely suspendCoroutine) cannot be cancelled, But always best suspendCancellableCoroutine to handle Coroutine Scope to cancel.

The principle behind suspendCancellableCoroutine

From the internal implementation, use suspendCancellableCoroutine suspendCoroutineUninterceptedOrReturn Coroutine in suspend function to obtain the Continuation. This Continuation object is intercepted by a CancellableContinuation that can be used to control the life cycle of the current Coroutine.

After that, passed to suspendCancellableCoroutine lambda will be implemented, if the lambda returns a result, Coroutine will recover immediately, or will be suspended, Until the CancellableContinuation is manually restored from the lambda.

The source code is shown below.

public suspend inline fun <T> suspendCancellableCoroutine(
  crossinline block: (CancellableContinuation<T>) -> Unit
): T =
  // Get the Continuation object of the coroutine that it's running this suspend function
  suspendCoroutineUninterceptedOrReturn { uCont ->

    // Take over the control of the coroutine. The Continuation's been
    // intercepted and it follows the CancellableContinuationImpl lifecycle now
    val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
    /* ... */
 
    // Call block of code with the cancellable continuation
    block(cancellable)
        
    // Either suspend the coroutine and wait for the Continuation to be resumed
    // manually in `block` or return a result if `block` has finished executing
    cancellable.getResult()
  }
Copy the code

Streaming data

If we want to fetch multiple data streams (using the netapi.getDatalist function), we need to create a data Flow using Flow. The ideal API would look something like this.

fun NetAPI.getDataListFlow(): Flow<Data>
Copy the code

To convert the callback-based streaming API to Flow, we need to use the callbackFlow builder that creates flows. In the callbackFlow lambda, we are in the context of a Coroutine and, therefore, can call the suspend function. Unlike the Flow builder, callbackFlow allows you to send values from different CoroutineContext via the send function, or send values outside of coroutines via the Offer function.

Typically, flow adapters that use callbackFlow follow these three common steps.

  • Create a callback to add elements to the flow using offer.
  • Register the callback.
  • Wait for the consumer to cancel the loop and unregister the callback.

The sample code is shown below.

// Send consumer Data updates fun netapi.getDatalistFlow () = callbackFlow<Data> {// A new Flow is currently created in a coroutine scope // 1. Val callback = object: NetCallback() {override fun success(result: result?) { result ? : return // Ignore null responses for (data in result.datas) {try {offer(data) // Add elements to flow} catch (t: {// Exception handling}}}} // 2. RequestDataUpdates (callback).addonFailureListener {e -> close(e)} Suspend the current coroutine by waiting for the consumer to cancel the loop and unregister the callback until the flow is closed awaitClose {// Remove listener removeLocationUpdates(callback)}}Copy the code

The principle behind callbackFlow

Inside the coroutine, the callbackFlow uses a channel, which is very similar in concept to a blocking queue. Channels have capacity configurations that limit the number of buffered elements.

The default size of a channel created in callbackFlow is 64 elements. When you try to add a new element to an already full channel, send suspends the data provider until there is room for the new element to join the channel. Offer does not add the relevant element to a channel and will immediately return false.

The principle behind awaitClose

AwaitClose implementation principle and suspendCancellableCoroutine actually is the same, the reference down the comments in the code below.

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
  ...
  try {
    // Suspend the coroutine with a cancellable continuation
    suspendCancellableCoroutine<Unit> { cont ->
      // Suspend forever and resume the coroutine successfully only 
      // when the Flow/Channel is closed
      invokeOnClose { cont.resume(Unit) }
    }
  } finally {
    // Always execute caller's clean up code
    block()
  }
}
Copy the code

Have a purpose?

What is the point of converting a callback-based API into a data stream? Take the most common view. setOnClickListener, which can be seen as either a one-shot scenario or a data stream scenario.

Let’s turn it into a suspendCancellableCoroutine form, the code is shown below.

suspend fun View.awaitClick(block: () -> Unit): View = suspendCancellableCoroutine { continuation -> setOnClickListener { view -> if (view == null) { Continuation. ResumeWithException (Exception (" error ")} else {block () continuation. Resume (view)}}} to use:  lifecycleScope.launch { binding.test.awaitClick { Toast.makeText(this@MainActivity, "loading", Toast.LENGTH_LONG).show() } }Copy the code

Yeah, it’s kind of a long story, except for farting with your pants down. Let’s change it to a data flow scenario.

fun View.clickFlow(): Flow<View> {return callbackFlow {setOnClickListener {trySend(it) // Offer function Deprecated, TrySend instead of} awaitClose {setOnClickListener(null)}}}  lifecycleScope.launch { binding.test.clickFlow().collect { Toast.makeText(this@MainActivity, "loading", Toast.LENGTH_LONG).show() } }Copy the code

All right, the farts are out.

It can be found that in this scenario, it is not useful to impose this mode, but will make others think that you are mentally retarded.

So what scenarios do you need to use? Let’s think about why we need a Callbback.

Most Callback hell scenarios are asynchronous requests that block or stream data output, so this is just a Callback that calls a closure. It’s not really a Callback, it’s just a lambda, so let’s look at another example.

You now have a TextView that displays input from an Edittext. Such a scenario is an explicit data Flow scenario, using the afterTextChanged callback in the TextWatcher of the Edittext, which we rewrite as a Flow, as shown below.

fun EditText.afterTextChangedFlow(): Flow<Editable? > { return callbackFlow { val watcher = object : TextWatcher { override fun afterTextChanged(s: Editable?) { trySend(s) } override fun beforeTextChanged(s: CharSequence? , start: Int, count: Int, after: Int) {} override fun onTextChanged(s: CharSequence? , start: Int, before: Int, count: Int) {}} addTextChangedListener (watcher) awaitClose {removeTextChangedListener (watcher)}}} to use:  lifecycleScope.launch { with(binding) { test.afterTextChangedFlow().collect { show.text = it } } }Copy the code

It’s kind of interesting, I didn’t write a callback, but I also got the data flow, well, it’s kind of forced.

But once this becomes a Flow, this is really interesting, this is a Flow, and we can do a lot of interesting things with Flow operators.

For example, we can limit the flow of the input box. This scenario is very common, such as search. The content entered by the user will be searched automatically, but it cannot be searched as soon as the content is entered, which will produce a large number of invalid search content.

In the past, similar requirements were mostly handled in the RxJava way, but now we have Flow, and we can still do this function in a scenario full of football program apis.

We can just add Debounce.

lifecycleScope.launch { with(binding) { test.afterTextChangedFlow() .buffer(Channel.CONFLATED) .debounce(300) .collect { Show. The text = it / / to point business processing viewModel. GetSearchResult (it)}}}Copy the code

You can even add a backpressure strategy and a Debounce to complete the data collection after the stream stops.

Of course you can also write buffers and debounces directly to the Flow returned by afterTextChangedFlow as the default for the current scenario.

References:

Medium.com/androiddeve…

I would like to recommend my website xuyisheng. Top/focusing on Android-Kotlin-flutter welcome you to visit