An overview of the

Recently, I wanted to learn the usage of flow in Kotlin. I googled and found many articles comparing RxJava with flow. In fact, I have never used RxJava in real business, not because it is bad, but… I’ve always been a bit of a sucker for RxJava operators, can’t read them well, and haven’t taken the time (or laziness) to learn how they work, so I’ve never dared to use them. When I saw flow this time, I thought I’d better first understand the principle of several operators inside it, otherwise it is not practical to use it.

It should be noted that Flow needs to be used in coroutines, so with coroutines, threads can be easily cut. The analysis of flow workflow can not be separated from the working principle of coroutines. For the analysis of Kotlin coroutines, please refer to the following articles:

  • Basic use of Kotlin coroutines
  • In-depth understanding of how Kotlin coroutines work
  • Coroutine cancellation and exception handling of Kotlin coroutines

Collect (collect); collect (collect);

public interface Flow<out T> {
    // is a suspend method, which means the current coroutine is suspended
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

public interface FlowCollector<in T> {
    // Data sender
    public suspend fun emit(value: T)
}
Copy the code

flow {}

Take the following code as an example to explain the basic flow of flow:

flow { emit(1) }.collect { println(it) }
Copy the code

Flow {}}

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>. () - >Unit): Flow<T> = SafeFlow(block)
Copy the code

SafeFlow implements the Flow interface. SafeFlow implements the collect method. SafeFlow implements the Flow interface.

collect

Collect (FlowCollector: FlowCollector

); collect(FlowCollector: FlowCollector

);

public suspend fun Flow< * >.collect(a): Unit = collect(NopCollector)

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) - >Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })
Copy the code

So let’s continue with the above example and look at the safeflow.collect method:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: Public abstract class AbstractFlow<T> > {collector.block()}} Flow<T>, CancellableFlow<T> { public final override suspend fun collect(collector: FlowCollector<T>) { val safeCollector = SafeCollector(collector, coroutineContext) try { collectSafely(safeCollector) } finally { safeCollector.releaseIntercepted() } } public abstract suspend fun collectSafely(collector: FlowCollector<T>) }Copy the code

The collect method encapsulates a SafeCollector object and executes the SafeFlow. CollectSafely method with that object as an argument. The collectSafely method simply executes the block(collector.block()), which is an extension function, Emit (1) in this example code is actually called SafeCollector. Emit (1) and then a SafeCollector security check is performed on the FlowCollector. Finally, the flowCollector.emit method is called, which is the emit method of the Collector object passed in when SafeCollector was created. I will only focus on the core process here, so I will not post the specific code.

According to the source code of the collect {} extension function, we can see that its emit method is actually executing the action code block passed in collect {}, and the parameter is the emit value — 1.

Summary: Flow {} (or flowOf, asFlow) creates a flow instance of type SafeFlow, whose parent is AbstractFlow. When invoking its Collect (FlowCollector) method, The emit method is called abstractFlow. emit. The emit method is called abstractFlow. emit. The flowCollector.emit method passed in collect is then called, and in the case of COLLECT {}, inside the emit method is the action block passed in collect. Because it triggers the action of sending data every time it calls collect, Flow is a cold Flow.

The main process is as follows:

flowOn

One of the most difficult operators to learn about flow is flowOn. For example, flow needs to be used in a coroutine, and emit(1) is executed in the dispatchers.default thread. Println (it) is executed in the thread where the parent coroutine is:

flow { emit(1) }.flowOn(Dispatchers.Default).collect { println(it) }
Copy the code

SafeFlow {} : SafeFlow {} : SafeFlow {} : SafeFlow;

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        // Return its Flow instance
        // Here we pass dispatchers. Default, so it does not meet this condition
        context == EmptyCoroutineContext -> this
        // SafeFlow is not of this type and therefore does not go through this process. In fact, FusibleFlow is the Flow object that is created when flowOn is called many times in a row
        this is FusibleFlow -> fuse(context = context)
        // Logic goes here
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}
Copy the code

The process has been commented out above, so the example code above is translated to: SafeFlow. FlowOn. Collect {} — > ChannelFlowOperatorImpl. Collect {}, pay attention to create ChannelFlowOperatorImpl object here when the incoming two parameters, The first this refers to the previous SafeFlow object, and the second context argument is the scheduler we passed in, which is a coroutine context.

ChannelFlowOperatorImpl. Collect implementation in the superclass ChannelFlowOperator. Collect, if it is found that the method of the incoming coroutineContext context not carrying a scheduler, That is, when we call flowOn without Dispatchers or other Dispatchers, we will directly call the collect method of SafeFlow at the next layer (the code is not attached). Otherwise, we will call the Collect method in the parent class ChannelFlow. Let’s look directly at the logic in flowOn after passing in the scheduler:

internal abstract class ChannelFlowOperator<S, T>(
    @JvmField protected val flow: Flow<S>,
    context: CoroutineContext,
    capacity: Int,
    onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
    override suspend fun collect(collector: FlowCollector<T>) {
        // Determine the coroutineContext logic
        // ...
        super.collect(collector) // Call the method in the parent ChannelFlow class}}public abstract class ChannelFlow<T>(
    // upstream context
    @JvmField public val context: CoroutineContext,
    // buffer capacity between upstream and downstream context
    @JvmField public val capacity: Int.// buffer overflow strategy
    @JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {
    override suspend fun collect(collector: FlowCollector<T>): Unit =
        coroutineScope {
            collector.emitAll(produceImpl(this))}public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
}
Copy the code

Here can see ChannelFlowOperatorImpl. Collect will eventually go to the collector. The emitAll (produceImpl (this)), the logic of production, consumption, we step to look at the production and the receiving process.

The production data

First look at the produceImpl method above:

internal fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null.@BuilderInference block: suspend ProducerScope<E>. () - >Unit
): ReceiveChannel<E> {
    val channel = Channel<E>(capacity, onBufferOverflow)
    val newContext = newCoroutineContext(context)
    val coroutine = ProducerCoroutine(newContext, channel)
    if(onCompletion ! =null) coroutine.invokeOnCompletion(handler = onCompletion)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

If you look at this method, does it sound familiar? As you can see from Kotlin’s previous in-depth understanding of how coroutines work, the produce method here essentially starts a new coroutine that executes a code block that is passed in as the collectToFun parameter, Then find collectToFun can be found. It will take ChannelFlowOperator collectTo method:

// ChannelFlowOperator
protected override suspend fun collectTo(scope: ProducerScope<T>) =
    FlowCollect is implemented in subclass ChannelFlowOperatorImpl
    flowCollect(SendingCollector(scope))

// ChannelFlowOperatorImpl
internal class ChannelFlowOperatorImpl<T>(
    flow: Flow<T>,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.OPTIONAL_CHANNEL,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
    override suspend fun flowCollect(collector: FlowCollector<T>) =
        // This flow is the SafeFlow object passed in by the upper layer
        flow.collect(collector)
}
Copy the code

Collect: SafeFlow; collect: SafeFlow; collect: SafeFlow; This code block calls the collector.emit(1) method (as you can see from the above code that the collector is now an instance of SendingCollector), so let’s see what the sendingCollector.emit method does:

public class SendingCollector<T>(
    private val channel: SendChannel<T>
) : FlowCollector<T> {
    override suspend fun emit(value: T): Unit = channel.send(value)
}
Copy the code

The produceImpl method starts a new coroutine and executes a block of code (calling the sendingCollector.emit method) from the upper flow object (so flowOn works upstream of it). The value is then sent through the channel. send method.

Receive data

The process of starting a coroutine and sending data through a Channel is described above. Here’s how the data is received. Back to the original code, starting with collector.emitall (Channel), The channel parameter is the same channel object that called send in the previous section:

public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
    emitAllImpl(channel, consume = true)

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        while (true) {
            val result = run { channel.receiveCatching() }
            if(result.isClosed) { result.exceptionOrNull()? .let {throw it }
                break // returns normally when result.closeCause == null
            }
            emit(result.getOrThrow())
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        if (consume) channel.cancelConsumed(cause)
    }
}
Copy the code

The emit method is used to emit the received value. Who is the FlowCollector object that calls this emit method? Flow {emit(1)}.flowon (dispatchers.default). Collect {println(it)} You can see that the FlowCollector is the object created by the code block passed in through the collect method:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) - >Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })
Copy the code

Finally, after receiving data and emit, value will be passed to the code block in Collect {} for execution.

Multiple flowOn

Take the following code for example:

flow { emit(1) }.flowOn(Dispatchers.IO).flowOn(Dispatchers.Main).collect { println(it) }
Copy the code

We can see from the previous parsing that the first call will be made here:

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}
Copy the code

The first call to flowOn returns the ChannelFlowOperatorImpl object, which implements the FusibleFlow interface. So the fuse(context = context) logic is used for the second call to flowOn:

public abstract class ChannelFlow<T>(
    // upstream context
    @JvmField public val context: CoroutineContext,
    // buffer capacity between upstream and downstream context
    @JvmField public val capacity: Int.// buffer overflow strategy
    @JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {
    public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
        val newContext = context + this.context
        val newCapacity: Int
        // Process calculation logic such as newCapacity
        // ...
        if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
            return this
        return create(newContext, newCapacity, newOverflow)
    }
}
Copy the code

The fuse method above returns a Flow object, notice this line: Val newContext = context + this.context, the newContext newContext = the passed context + the previous this.context, If you are familiar with the structure of the CoroutineContext CoroutineContext, it is a bit like a key-value data structure, so for the key of the Dispatcher, The final existing scheduler will be this.context because it overwrites the incoming context(because it precedes the plus sign) and this.context is the previous context, in this case dispatchers. IO, so the second flowOn will be invalid. Interested partners can verify their own.

summary

  • The produceImpl method starts a new coroutine and executes a block of code (calling the sendingCollector.emit method) from the upper flow object (so flowOn works upstream of it). The value is then sent through the channel. send method. The thread specified by flowOn is running at this time.
  • The collector.emitall method receives the data through the above channel object and calls the flowCollector.emit method to emit the dataSafeFlow.flowOn.collect {}Collect code block logic. It runs in the thread specified by the parent coroutine.

FlowOn flow is shown as follows:

If you feel bored with the source code, you can just focus on the boldface flow and the associated summary. If the content of the article is wrong, welcome to point out, common progress! Leave a “like” if you think it’s good

The blog link