If you’re a library author, you might want to make it easier for users to invoke your Java – or callback-based apis when using Kotlin coroutines and Flow. In addition, if you are a consumer of the API, you might be willing to adapt third-party API interfaces to coroutines to make them more Kotlin friendly.
This article will introduce how to use coroutines and Flow to simplify the API, and how to use suspendCancellableCoroutine and callbackFlow apis to create your own adapter. For curious readers, these apis are dissected to help you understand how they work underneath.
If you prefer to watch the video, you can click here.
Check existing coroutine adapters
Before you write your own wrapper for an existing API, check to see if an adapter or extension method already exists for your use case. Below are some libraries that contain coroutine adapters of common types.
The Future type
For future types, Java 8 integrates CompletableFuture, while Guava integrates ListenableFuture. This is not all, and you can search online to see if there is an adapter for your Future type.
// Wait for execution of the CompletionStage to complete without blocking the thread
suspend fun <T> CompletionStage<T>.await(a): T
// Wait for ListenableFuture to complete without blocking the thread
suspend fun <T> ListenableFuture<T>.await(a): T
Copy the code
Using these functions, you can get rid of the callback and suspend the coroutine until the result of the future is returned.
Reactive Stream
For reactive flow libraries, there is integration for RxJava, Java 9 apis, and reactive flow libraries:
// Convert the given reactive Publisher to a Flow
fun <T : Any> Publisher<T>.asFlow(a): Flow<T>
Copy the code
These functions convert a reactive Flow to a Flow.
For Android API
For the Jetpack library or Android platform API, you can refer to the list of Jetpack KTX libraries. There are over 20 libraries with KTX versions that make up the Java apis you are familiar with. These include SharedPreferences, ViewModels, SQLite, and Play Core.
The callback
Callbacks are a very common practice when implementing asynchronous communication. In fact, we use callbacks as the default solution for the Java programming language in our background thread task running guide. However, callbacks have a number of drawbacks: this design can lead to convoluted nesting of callbacks. At the same time, error handling is more complicated because there is no easy way to propagate it. In Kotlin, you can simply invoke callbacks using coroutines, but only if you create your own adapter.
Create your own adapter
If you can’t find an adapter that fits your use case, it’s more straightforward to write your own. For one-off asynchronous calls, can use suspendCancellableCoroutine API; For streaming data, you can use the callbackFlow API.
As an exercise, the following example will use the Fused Location Provider API from Google Play Services to retrieve Location data. The API interface is simple, but it uses callbacks to perform asynchronous operations. When logic gets complicated, these callbacks tend to make code unreadable, and we can use coroutines to get rid of them.
If you want to explore other solutions, you can be inspired by the source code linked to the function above.
One-time asynchronous call
The Fused Location Provider API provides the getLastLocation method to get the last known Location. The ideal API for coroutines is a suspend function that directly returns the exact result.
Note: This API returns a value ofTaskAnd there are already corresponding onesThe adapter. We’ll use it as an example for learning purposes.
We can obtain better by create extension function for FusedLocationProviderClient API:
suspend fun FusedLocationProviderClient.awaitLastLocation(a): Location
Copy the code
Since this is a one-time asynchronous operations, we use suspendCancellableCoroutine functions: one for the association Cheng Ku create hang function of the underlying building block.
SuspendCancellableCoroutine performs as a parameter of the incoming code block, then waiting for continue signal during suspends the execution of coroutines. The coroutine is resumed when the resume or resumeWithException methods in the coroutine Continuation object are called. About the Continuation of more information, please refer to: Kotlin Vocabulary | reveal coroutines suspend in the modifier.
We use callbacks that can be added to the getLastLocation method to restore the coroutine at the appropriate time. See the following implementation:
/ / FusedLocationProviderClient extension function, return to the last known location
suspend fun FusedLocationProviderClient.awaitLastLocation(a): Location =
// Create a new removable routine
suspendCancellableCoroutine<Location> { continuation ->
// Add a listener to resume coroutine execution
lastLocation.addOnSuccessListener { location ->
// Restore the coroutine and return to the position
continuation.resume(location)
}.addOnFailureListener { e ->
// Recover coroutines by throwing an exception
continuation.resumeWithException(e)
}
The end of the / / suspendCancellableCoroutine block. This is going to suspend the coroutine
// Until a callback uses the continuation argument
}
Copy the code
Note: although the association Cheng Ku also contains the version cannot be cancelled coroutines builder (namely suspendCoroutine), but the best is always choose to use suspendCancellableCoroutine processing coroutines scope of cancelled and spread from the underlying API to cancel the event.
SuspendCancellableCoroutine principle
Internally, suspendCancellableCoroutine use suspendCoroutineUninterceptedOrReturn Continuation in hang function of coroutines. This Continuation object is intercepted by a CancellableContinuation object, which from this point on controls the life cycle of the coroutine (its implementation has Job functionality, but with some limitations).
Next, pass to suspendCancellableCoroutine lambda expressions will be executed. If the lambda returns a result, the coroutine is immediately restored; Otherwise, the coroutine will remain suspended until the CancellableContinuation is manually restored by the lambda.
You can see what happens with my comments in the code snippet below (the original implementation) :
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T- > >)Unit
): T =
// Gets the Continuation object of the coroutine running this suspended function
suspendCoroutineUninterceptedOrReturn { uCont ->
// take over coroutines. The Continuation has been intercepted,
/ / the following will follow the life cycle of CancellableContinuationImpl
val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
/ *... * /
Call a block of code with a cancelable Continuation
block(cancellable)
// Suspend the coroutine and wait for the Continuation to be resumed in the block, or return the result when the block ends execution
cancellable.getResult()
}
Copy the code
Want to know more about the working principle of the suspended function, please refer to this article: Kotlin Vocabulary | reveal coroutines suspend in the modifier.
Streaming data
If we instead want the user’s device to periodically receive location updates (using the requestLocationUpdates function) as it moves in the real world, we need to use Flow to create data flows. An ideal API would look something like this:
fun FusedLocationProviderClient.locationFlow(a): Flow<Location>
Copy the code
To convert the callback-based API to Flow, you can use the callbackFlow Flow builder to create a new Flow. The interior of a callbackFlow lambda expression is in the context of a coroutine, which means that it can call suspended functions. Unlike the Flow flow builder, channelFlow can use the Offer method to send data outside of a different CoroutineContext or coroutine.
Typically, building a flow adapter using callbackFlow follows these three steps:
- Create a callback that adds elements to the flow using offer;
- Register callback;
- Wait for the consumer to fetch the cancellation program and cancel the callback.
Applying the above steps to the current use case, we get the following implementation:
/ / send the position update to consumers fun FusedLocationProviderClient. LocationFlow () = callbackFlow < Location > {/ / to create a new Flow. This code is executed in the coroutine. Val callback = object: LocationCallback() {override fun onLocationResult(result: LocationResult?) { result ? For (location in result.locations) {try {offer(location) // Send location to flow} catch (t: Throwable) {// Location cannot be sent to flow}}}} // 2. Register callbacks and get location updates by calling requestLocationUpdates. requestLocationUpdates( createLocationRequest(), callback, Looper.getmainlooper ()).addonFailureListener {e -> close(e) // Close flow if an error occurs} // 3. Wait for the consumer to fetch the cancellation program and cancel the callback. This process suspends the coroutine until the Flow is closed. AwaitClose {// Clean up the code here removeLocationUpdates(callback)}}Copy the code
The inner workings of callbackFlow
Internally, the callbackFlow uses a channel. A channel is conceptually similar to a blocking queue — it is configured to specify capacity: the number of elements that can be buffered. A channel created in callbackFlow has a default capacity of 64 elements. If you add a new element to a full channel, send suspends the producer until there is room for the new element in the channel, since offer does not add the element to the channel and immediately returns false.
AwaitClose internals
It is interesting to note that awaitClose suspendCancellableCoroutine is for internal use. You can take a look at this with my comments in the following code snippet (see the original implementation) :
public suspend fun ProducerScope< * >.awaitClose(block: () -> Unit = {}){...try {
// Suspend the coroutine with a cancelable continuation
suspendCancellableCoroutine<Unit> { cont ->
// The coroutine is successfully recovered only if the Flow or Channel is closed, otherwise it remains suspended
invokeOnClose { cont.resume(Unit)}}}finally {
// The caller's cleanup code is always executed
block()
}
}
Copy the code
Reuse Flow
Unless an additional intermediate operator is used (e.g., conflate), Flow is cold and inert. This means that the building block is executed each time the flow terminal operator is called. For our use case, since adding a new location listener is inexpensive, this feature is not a big deal. The same cannot be said for other implementations.
You can reuse the same flow across multiple collectors and turn a cold flow into a hot flow using the shareIn intermediate operator.
val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
...
}.shareIn(
// Let flow follow applicationScope
applicationScope,
// Sends the last sent element to the new collector
replay = 1.// Keep producers active while there are active subscribers
started = SharingStarted.WhileSubscribed()
)
Copy the code
You can through the article the cancellation, and abnormal in the coroutines | reside task, rounding to learn more about using applicationScope best practice in the application.
You should consider creating coroutine adapters to make your API or existing API clean, readable, and Kotlin compliant. First check whether the existing adapters are available, and if not, you can use suspendCancellableCoroutine for one-time call; Or use callbackFlow to create your own adapter for streaming data.
You can get started with the topics covered in this article by codelab: Creating the Kotlin extension library.