The previous article introduced some basic uses of Flow, enough for most everyday scenarios.

This is the third article in Kotlin’s coroutine series.

This article will take a look at the shadow brother behind Flow — Channel, and its extension ChannelFlow, and explore the implementation principle behind it

Some of this content is still in the experimental API stage and may be changed later.

Kotlin coroutine series related articles navigation

Scratch the Kotlin coroutine

Kotlin Flow guide (1) Basic applications

Kotlin Flow guide (2)

Kotlin Flow Guide (3) SharedFlow and StateFlow

Kotlin version: 1.5.31

Coroutine version: 1.5.2

The following text.


Channel

A Channel is similar to a BlockQueue in concept. It is a concurrency safe buffer queue (fifO) that communicates between multiple coroutines.

The current version is based on a lock-free bidirectional linked list structure, with a node that is never deleted as the root node that connects the entire list end to end.

  • When new elements are added, they are added to the left of the root node, to the end of the entire queue.
  • When an element is fetched, it is first removed and fetched from the right of the root node, that is, from the top of the queue.
  • The left (front) and right (back) nodes of each element node are CAS reference types.

Both send and receive data within a Channel are suspend functions by default.

For the same Channel object, more than one coroutine is allowed to send data, and more than one coroutine is allowed to receive data.

Unlike Flow, a Channel is a heat Flow, but it does not support data Flow operations.

Even if there is no subscription consumption, the production side also starts sending data and is always up and running.

The Channel interface only defines constants. The actual functionality is defined in its two inherited interfaces, SendChannel and ReceiveChannel.

create

A Channel itself is an interface that cannot be created directly, but it has a function of the same name that creates a Channel, equivalent to a factory method.

The capacity parameter is the buffer capacity, usually a constant defined in Channel.

  • RENDEZVOUS

    Default no lock, no buffer, only when the consumer called, will send data, otherwise suspend the send operation.

    When the cache policy is not bufferoverflow.suspend, an ArrayChannel with buffer capacity of 1 is created.

  • CONFLATED

    The queue size is only one, and the onBufferOverflow parameter must be bufferoverflow.suspend.

    When the buffer is full, it is always replaced with the latest element, and the previous element is discarded.

    Create the implementation class ConflatedChannel

    Internally, send and receive element operations are locked using ReentrantLock, which is thread-safe.

  • UNLIMITED

    Unlimited capacity. After the buffer queue is full, the buffer queue will be expanded until OOM.

    The interior is unlocked and never hangs

  • BUFFERED

    By default, a 64-bit buffer queue is created. When the buffer queue is full, data is suspended until the queue is empty.

    Create an implementation class ArrayChannel that internally locks send and receive element operations using ReentrantLock, thread safe.

  • Custom capacity

    If capacity is 1 and onBufferOverflow is bufferoverflow. DROP_OLDEST, the system creates a ConflatedChannel as the implementation class because it works in the same way as CONFLATED.

    All other cases will be created as the implementation class ArrayChannel.

producers

SendChannel is the producer of the sent data.

  • send

    Suspend function to add elements to the queue. When the buffer queue is full, it suspends the coroutine, suspends the storage of elements, and resumes the coroutine until the queue capacity meets the storage requirement.

     public suspend fun send(element: E)
    Copy the code
  • trySend

    Try to add an element to the queue and return ChannelResult for the result of the operation.

     public fun trySend(element: E): ChannelResult<Unit>
    Copy the code
  • Close Close the queue, idempotent operation, subsequent operations are invalid, only one is allowed

     public fun close(cause: Throwable? = null): Boolean
    Copy the code
  • isClosedForSend

    Experimental API: true indicates that the Channel is closed and stops sending.

ProducerScope

SendChannel also has a ProducerScope subinterface that represents the coroutine scope that allows data to be sent.

It is still an experimental API and is not recommended for external use

Produce, an extension of CoroutineScope, is provided to quickly start producer coroutines and return ReceiveChannel.

Internally, it actually creates channels in coroutine builds to send data.

  • awaitClose

    A suspend function, an extension function in ProducerScope, suspends to wait for the coroutine to close and performs an operation before closing, usually for resource reclamation.

    After calling awaitClose, an external manual call to SendChannel’s close is required to close, otherwise the coroutine will hang until the coroutine scope is closed.

consumers

ReceiveChannel represents a consumer that receives data and only serves to collect data

  • receive

    Suspend function that receives and removes elements from the buffered queue and suspends the coroutine if the buffered queue is empty.

    If after the Channel is closed, call the receive to value, will be thrown ClosedReceiveChannelException anomalies.

     public suspend fun receive(): E
    Copy the code
  • receiveCatching

    The suspend function, which does the same thing as receive but prevents a crash if an exception is thrown when the buffer queue is closed, returns the value of the element fetched by the ChannelResult package, as well as the status of the current operation

     public suspend fun receiveCatching(): ChannelResult<E>
    Copy the code
  • tryReceive

    Attempting to pull an element from the buffered queue returns the ChannelResult wrapped element and represents the result of the operation.

     public fun tryReceive(): ChannelResult<E>
    Copy the code
  • cancel

    When the receiver of the buffered queue stops receiving data, it removes all elements of the buffered queue and stops SendChannel from sending data, internally calling SendChannel’s close function.

    Call this function with care. Normally, a Channel should be dominated by SendChannel.

    After all, there are very few teachers still on the stage, the following students have said the scene.

  • iterator

    Suspend the function, which receives a Channel, allowing iteration using the for loop

     public operator fun iterator(): ChannelIterator<E>
    Copy the code

    ReceiverChannel’s iterators are suspend functions and can only be used in coroutines.

     public interface ChannelIterator<out E> {
         public suspend operator fun hasNext(): Boolean
         ...
     }
    Copy the code

    The elements in the buffer queue can be taken out in turn.

  • The actor with ActorScope

    Obsolete apis annotated with @ObsoletecoroutinesAPI, corresponding to the consumer coroutine scope for Produce.

    These two apis are said to be redesigned and not used at this time. issues

     public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> 
     ​
     public fun <E> CoroutineScope.actor(...) : SendChannel<E>
    Copy the code
  • consume

    ReceiveChannel extension function. If a Channel is abnormal or terminated, cancel is called to close the Channel receiver.

ChannelResult

ChannelResult is an inline class that represents only the result of a Channel operation and carries elements.

summary

The current version of Channel exists only as an infrastructure for producer-consumer model buffer queues and communication between multiple coroutines.

In Flow, Channel is always involved in switching coroutine scheduler and back pressure buffer.

Select expression

When it comes to channels, we have to mention the special mechanism in Kotlin Coroutine — select expressions.

In Kotlin Coroutine, there is a special suspend function, select, which allows you to wait for multiple pending results at the same time, and only take the value of the one that completes fastest as a function recovery.

At first glance, this function implementation is very similar to the suspendCoroutine function of the callback API coroutine in Kotlin coroutine. It also tries to get the result, otherwise suspends and waits for the result.

However, the parameter Builder is a function type with SelectBuilder as the receiver.

The select function is more like a Kotlin DSL syntax in that the block of code in select is not a suspend function and can only call the expression clause defined in the SelectBuilder.

Select allows execution of a subexpression by selecting the type of the last statement in the subexpression as its return value type.

  • selectwillThe first expression takes precedence, if the first item cannot be executed, the next item is selected, and so on.
  • If you needCompletely fair selection of expressions, use theselectUnbiased.

To use a selection expression, use a function that returns a value of the SelectClause family as a substatement.

Deferred choice

In the return type Deferred, which uses async to start the coroutine, the onAwait function is defined as a subexpression of the selection expression, with SelectClause1 as the return value type.

It has the same effect as await, except that when it is used as a substatement in a SELECT statement, it can wait for multiple coroutines to return at the same time and select the one that has completed the most quickly as the actual return value.

fun testSelect() = runBlocking { val d1 = async { delay(60) 1 } val d2 = async { delay(50) 2 } val d3 = async { delay(70) 3 } val data = select<Int> { d3.onAwait{data-> println("d3 first result $data") data } d1.onAwait{data-> println("d1 first result :$data") data } d2.onAwait{i-> println("d2 first result : $data") data } } println("result : $data")} d2 first result: 2 result: 2Copy the code

Since the second item Deferred is the first to get the value from await, select also returns it.

Channel selection

Similarly, a Channel defines a function that returns a value of type SelectClause.

  • onSendEquivalent to:send.Parameter as the data to be sentAnd after being selected, the callback is currently executed to sendChannelInstance.
  • onReceiveEquivalent to:receiveCallback the result of fetching a value from the cache queue.
  • onReceiveCatchingEquivalent to:receiveCatching, which calls back the state of the operation to fetch a value from the cache queueChannelResult.

A Channel uses a selection expression, which is typically used in multiple Channel backup switching scenarios.

fun testSelectChannel() = runBlocking { val slowChannel = Channel<Int>( capacity = 1,onBufferOverflow = BufferOverflow.SUSPEND ) val fastChannel = Channel<Int>( capacity = 3,onBufferOverflow = BufferOverflow.SUSPEND ) // Launch (dispatchers. IO){for (I in 1.. 5){ if (! Select <Unit> {// Data to be sent slowchannel. onSend(I){channel-> // The lamda parameter is the currently selected channel println("slow channel selected $channel send $i") delay(50) } fastChannel.onSend(i){channel-> println("fast channel Slowchannel.close () fastchannel.close ()} // Consumer coroutine launch {while (isActive && ! slowChannel.isClosedForSend && ! FastChannel. IsClosedForSend) {/ / to ChannelResult carrying value as to choose the value of the expression val result = select < Int > {slowChannel. OnReceiveCatching { println("slowChannel is receive ${it.getOrNull()}") delay(100) it.getOrNull()? :-1 } fastChannel.onReceiveCatching{ println("fastChannel is receive ${it.getOrNull()}") it.getOrNull()? :-1 } } println("receive result : $result") } } delay(500) }Copy the code

In the above code, fastChannel is used as backup. When slowChannel cannot send data, fastChannel is used to send data, and the same logic applies to the receiver. Program running results:

slowChannel receive 1 slow channel selected ArrayChannel@1cc4b438{EmptyQueue}(buffer:capacity=1,size=1) send 1 slow channel selected ArrayChannel@1cc4b438{EmptyQueue}(buffer:capacity=1,size=1) send 2 receive result : 1 slowChannel receive 2 slow channel selected ArrayChannel@1cc4b438{EmptyQueue}(buffer:capacity=1,size=1) send 3 fast channel selected ArrayChannel@580f2a18{EmptyQueue}(buffer:capacity=2,size=1) send 4 fast channel selected ArrayChannel@580f2a18{EmptyQueue}(buffer:capacity=2,size=2) send 5 receive result : 2 slowChannel receive 3 receive result : 3 fastChannel receive 4 receive result : 4 fastChannel receive 5 receive result : 5 // When the Channel is closed, the elements carried by ChannelResult are null slowChannel receive NULL receive result: -1Copy the code

ChannelFlow

Although Channel can communicate thread safely in multiple coroutines, it cannot do complex data flow operations and is cumbersome to use.

Kotlin Flow, on the other hand, has flexible data Flow manipulation capabilities, but it is not thread-safe and does not even allow switching of coroutine context while sending.

So if you put these two together, they’re complementary, right? ChannelFlow was born.

However, the ChannelFlow class itself is an internal API and cannot be called directly from the outside.

FlowOn principle analysis

The buffer and flowOn operators introduced in the previous article inherit the internal implementation of ChannelFlowOperator from ChannelFlow.

Let’s use flowOn as an example to see how to switch CoroutineContext internally.

ChannelFlowOperatorImpl is an implementation class of ChannelFlowOperator. It only implements flowCollect of its parent class to receive upstream data.

FlowCollect, in turn, is called within the collectTo of the parent ChannelFlowOperator class.

At the same time, it also rewrites Collect to do some quick detection to avoid resource waste caused by unnecessary Channel creation. When the coroutine scheduler needs to be modified, the parent class collect implementation is still used.

The buffer operator also creates the ChannelFlowOperatorImpl, except that capacity is allowed to be set externally and flowOn is fixed to channel. OPTIONAL_CHANNEL.

So where did the ProducerScope type parameter in the currently implemented collectTo function come from? Continue tracing back to the parent ChannelFlow class.

In fact, the parent class ChannelFlow collect simply creates a new coroutine in the current collector’s coroutine context and sends data through emitAll.

Obviously, the subclass implementation of the collectTo is actually the Lamda block in Produce that was introduced earlier. Create a Channel producer coroutine in the new coroutine context to add upstream data to the Channel buffer queue.

ReceiveChannel ReceiveChannel

Of course, this is just an extension of the FlowCollector function, which will loop out the values from the Channel and send them downstream.

summary

So each call to the flowOn operator creates a new Channel internally and, in the newly set coroutine scheduler, creates a new coroutine that sends all upstream data to the cache queue.

SUSPEND determines the backpressure rule of the buffer queue by default backpressure policy bufferoverflow.suspend.

For the downstream, a new coroutine is created in the coroutine scheduler where the collector resides. As a consumer, the coroutine circulates and receives the value of the Channel buffer queue and sends the data to the downstream.

This is why flowOn’s thread scheduling only applies to upstream data flows.

Callback API to data flow

ChannelFlow’s other implementation, ChannelFlowBuilder, provides the ability to convert the callback API into a Flow data Flow (cold Flow).

There are two publicly available functions.

  • channelFlow

The block argument is a function type that receives ProducerScope.

This is also the producer coroutine scope of a Channel mentioned earlier, which can send data using send or trySend.

  • callbackFlow

In fact, CallbackFlowBuilder is a subclass of ChannelFlowBuilder. The only difference is that it forces a call to close or suspend awaitClose at the end of the coroutine block to handle resource reclamation at the end of the coroutine.

Close the Channel, not allowing the Channel to continue sending elements.

To be exact, before the lamda body of callbackFlow is executed, you must ensure that close is called to stop the Channel from sending elements, otherwise an exception will be thrown.

AwaitClose, on the other hand, suspends without further logic, waiting for the Channel Channel to close or for the collector’s coroutine to be closed.

It is more of an unregistered callback API, waiting for registered callbacks to pass data to avoid memory leaks that would otherwise throw exceptions.

When channelFlow and callbackFlow are used to create data flows, it is allowed to switch the coroutine context at the production end with withContext. By default, the coroutine scheduler of the coroutine where the Collect collector is located is used.

fun testChannelFlow() = runBlocking { val flow = channelFlow<String> { send("11") println("send first on ${Thread.currentThread()}") withContext(Dispatchers.IO){ send("22") println("send second on ${Thread.currentThread()}") } send("33") println("send third on ${Thread.currentThread()}") awaitClose { println("awaitClose") } } val job = launch { flow.collect { println("result : $it")}} delay(200) job.cancel() {send first on Thread[Test worker @coroutine#3,5,main] result: 11 send second on Thread[defaultdispatcher-worker-1 @coroutine#3,5,main] result: 22 send third on Thread[Test worker @coroutine#3,5,main] result: 33 awaitCloseCopy the code

Their application

Suppose you have a callback function that you want to turn into a Flow.

fun registerCallBack(callBack : (String)->Unit){ for (i in 0.. 5){ //do something callBack("data $i") } } suspend fun createCallBackFlow() = callbackFlow<String>{ RegisterCallBack {result-> send(result); AwaitClose {// unRegisterCallBack(); // unRegisterCallBack(); // unRegisterCallBack(); // unRegisterCallBack(); // unRegisterCallBack();Copy the code

But in the code block for this callback, because it is not a suspend function, you cannot call SEND here to send data.

In addition to trySend trying to send data, there is also a blocking function called trySendBlocking in SendChannel, which is waiting for the result of sending data and returns ChannelResult, indicating the result of enqueuing.

A failed result is also returned when the Channel is already closed.

Do not call this function from a suspended function or coroutine; it is recommended only to call it from a normal callback function.

  • Thrown if the thread is terminated when it is blockedInterruptedExceptionThe exception.

The principle of analytic

So how exactly does ChannelFlowBuilder convert a callback into a Flow data Flow?

The ChannelFlowBuilder does nothing but implement the suspend function collectTo as a function type passed in externally.

As with the flowOn operator parsed earlier, it simply changes the logic of receiving data from the upstream data stream to adding data to the buffered queue by the external manual control Channel.

How does the CallBackFlowBuilder created by callbackFlow enforce detection of Channel closure?

summary

ChannelFlow is thread-safe and allows you to switch coroutine context while sending data, while using the flow operator.

If the callback API is added in a registration-like manner, you need to de-register it in the last call to the awaitClose function to avoid memory leaks while waiting for the callback to return data.

Therefore, it is recommended to use callBackFlow to avoid memory leakage caused by forgetting to close after sending.

Although channelFlow and callBackFlow functions have been converted, there are still experimental apis in them, and there are still warnings when using them directly.

You need to add @ OptIn annotation mark (ExperimentalCoroutinesApi: : class).

Use the lab API

When you use an API in the experimental or preview phase of a Kotlin coroutine, the IDE will warn you that the API is unstable and may be changed later.

Let developers on each call API function to add @ ExperimentalCoroutinesApi or @ FlowPreview annotation. But it’s a bit of a hassle, and what are some ways to be lazy once and for all?

You can add them to the build.gradle file in the module that calls these apis

//module build.gradle ... android { ... KotlinOptions {jvmTarget = '1.8' freeCompilerArgs += [ "-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi", "-Xuse-experimental=kotlinx.coroutines.FlowPreview" "-Xopt-in=kotlin.RequiresOptIn" ] } }Copy the code

After recompiling, you can use the experimental and preview APIS directly.

If the API has been upgraded to obsolete functions or classes annotated by the @ObsoletecoroutinesAPI in future releases, there will still be hints; replace them with equivalent actions using IDE hints.

conclusion

Although the appearance of Channel provides a very convenient tool for communication between coroutines. With the advent of Kotlin Flow, channels quickly moved into the background, with operators and Broadcastchannels in older versions scrapped.

There are many channels in the internal implementation of many functions in Flow, whose own responsibilities are increasingly single and only exist as the concurrent safe buffer queue for communication between coroutines.

For most scenarios, Flow is more recommended than Channel.

The resources

Kotlin coroutine official document (6)Channel

The select expression

Simplify API design with coroutines and flows