1. Introduction

Google has been rolling out the Flow API, commonly known as data flow, for a long time.

A data stream is built on the basis of a coroutine and can provide multiple values. Conceptually, a data stream is a sequence of data that can be computed and processed asynchronously. For example, Flow

is a data stream that emits integer values.

A data stream is very similar to an Iterator that generates a sequence of values, but it uses suspend functions to generate and consume values asynchronously. This means, for example, that the data flow can safely make a network request to generate the next value without blocking the main thread. A flow is like a flow in which values are generated and can be used in coroutines.

Based on the feature of Flow API, we can use it in many data communication scenarios. Meanwhile, since flow is used in coroutine scenarios, it can be used to encapsulate many interesting things. This paper brings a bus library encapsulated by this API.

2. Preview

Now that we’ve covered a lot of concepts, it’s time to look at how the code behaves, starting with Flow

public interface Flow<out T> {
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}
Copy the code

Collect is an interface to collect data from a coroutine. It is also a suspend method. It is intended to be used in coroutines. In other words, collect runs inside a coroutine, and OK is done. Notice the parameter FlowCollector, what is it?

Don’t worry, it’s down there

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}
Copy the code

The FlowCollector interface defines the sending specification for the Flow API. Note that it is also a suspend method, which belongs to the sender of the message. We can see why ordinary flow is a cold flow (note that ordinary flow can realize heat flow, such as shareFlow below). You can see that collect in flow interface, This is done by accepting a FlowCollector and calling the emit inside it to send data. There is also a small detail that the generics in the flow are out modifier, while the FlowCollector is in modifier. It is left to the reader to consider why this is the case.

3. Different from existing data types

StateFlow, Flow, and LiveData

Since we are going to use SharedFlow, let’s go over the differences

StateFlow and LiveData have similarities. Both are observable data container classes, and both follow similar patterns when used in application architectures.

Note, however, that StateFlow and LiveData do behave differently:

  • StateFlowThe initial state needs to be passed to the constructor, andLiveDataDon’t need.
  • When the View into theSTOPPEDState,LiveData.observe()Will automatically unregister the user while fromStateFlowOr any other data stream that collects data does not automatically stop.

So what’s the difference between StateFlow and SharedFlow

public interface StateFlow<out T> : SharedFlow<T> {
   
    public val value: T
}
Copy the code

SharedFlow is a higher abstraction of StateFlow and SharedFlow is a higher abstraction of StateFlow

public interface SharedFlow<out T> : Flow<T> {
    /**
     * A snapshot of the replay cache.
     */
    public val replayCache: List<T>
}
Copy the code

You can see that there is a list object in there, which tells you where the data flow comes from, but the most essential data structure is a list. If you’re thinking about the internal core of our bus framework, let’s start encapsulating our bus data flow

4. Start messing around

What we need to implement: 1. Bus data flow, internal can use flow API implementation, here can use SharedFlow. 2. Compared to the fact that EventBus requires registration and unregistration, I believe you must be very annoyed, so we need to automatically register and unregister this function. 3. Can send sticky events and non-sticky events 4. 6. Realize the core functions of sending and receiving

Based on the above questions, we started to implement step by step:

Q1: According to the above source code analysis, SharedFlow maintains a list, which meets the data structure of our message storage, so no problem

Q2: Automatic registration unregister this, hey hey, you don’t need to see, will think of LifecycleEventObserver interface, after all, listen to the life cycle, the old routine, so where do we implement this interface? The flow? In the event? Nonono, we have been missing one important point, is the coroutine, the above said a lot of things, actually flow in the coroutine face wrong, we can work on the coroutine. In other words, we control the life cycle of the coroutine, so we control the life cycle of the flow.

class LifeCycleJob(private val job: Job) : Job by job, LifecycleEventObserver { override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) { if (event == Lifecycle.Event.ON_DESTROY) { this.cancel() } } override fun cancel(cause: CancellationException?) { if (! job.isCancelled) { job.cancel() } } }Copy the code

Q3: How to distinguish sticky events from non-sticky events, which is also easy to handle. For sticky data, let the receiver accept the data, send the data in the list again, and non-sticky subscription is not sent. Since we want to subscribe to a single event, we can use MutableSharedFlow to achieve this

@Suppress("FunctionName", "UNCHECKED_CAST")
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    require(replay >= 0) { "replay cannot be negative, but was $replay" }
    require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
    require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
        "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
    }
    val bufferCapacity0 = replay + extraBufferCapacity
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
Copy the code

The MutableSharedFlow interface implements SharedFlow and FlowCollector, so it can act as sender and collector.

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T>
Copy the code

See the MutableSharedFlow parameter, so we can take advantage of its properties directly: For the event set, we can define the message data set as follows:

Var events = ConcurrentHashMap<Any, MutableSharedFlow<Any>>() private set stickyEvents = ConcurrentHashMap<Any, MutableSharedFlow<Any>>() private setCopy the code

Q4: Thread switching, which is what coroutines are good at, uses Dispatch to switch the response thread

Q5: If it is simple enough, the extension function is adopted here, which can be very convenient to expand the original function.

Q6: MutableSharedFlow implements the interface for sending and receiving flows, so we can use this feature

inline fun <reified T> post(event: T, isStick: Boolean) { val cls = T::class.java if (! isStick) { events[cls]? .tryEmit(event as Any) ? : run { events[cls] = MutableSharedFlow(0, 1, BufferOverflow.DROP_OLDEST) events[cls]!! .tryEmit(event as Any) } } else { stickyEvents[cls]? .tryEmit(event as Any) ? : run { stickyEvents[cls] = MutableSharedFlow(1, 1, BufferOverflow.DROP_OLDEST) stickyEvents[cls]!! .tryEmit(event as Any) } } }Copy the code

So what about receiving, receiving is a little bit tricky, because we’re all using POST, so how do we tell the difference between a sticky message and a non-sticky message? Here we use both listening, so is it listening in the same coroutine field or different listening? Here involves a small problem, collect function suspends the current coroutines, so if you use the same collaborators Cheng Yu listening within, is clearly not enough, because the same collaborators within Cheng Yu (regardless of the child association Cheng Yu) cases, actually run is serial, so we need to open two collaborators Cheng Yu, inside collect function called respectively, Listen for sticky and non-sticky events

Graph TD coroutine field 1 --> listen for non-sticky events --> Stop

Graph TD coroutine field 2 --> Listen for sticky events --> Stop
inline fun <reified T> onEvent( event: Class<T>, crossinline dos: (T) -> Unit, owner: LifecycleOwner, env: SubscribeEnv ) { if (! events.containsKey(event)) { events[event] = MutableSharedFlow(0, 1, BufferOverflow.DROP_OLDEST) } if (! stickyEvents.containsKey(event)) { stickyEvents[event] = MutableSharedFlow(1, 1, BufferOverflow.DROP_OLDEST) } val coroutineScope: CoroutineScope = when (env) { SubscribeEnv.IO -> CoroutineScope(Dispatchers.IO) SubscribeEnv.DEFAULT -> CoroutineScope(Dispatchers.Default) else -> CoroutineScope(Dispatchers.Main) } coroutineScope.launch { events[event]? .collect { if (it is T) { dos.invoke(it) } } }.setLifeCycle(owner.lifecycle) coroutineScope.launch { stickyEvents[event]? .collect { if (it is T) { dos.invoke(it) } } }.setLifeCycle(owner.lifecycle) }Copy the code

The final summary

At this point, we can basically implement a bus data stream library, so use it. In fact, this is the core idea of Pigeon, finally attached to github address, another humble day to find star github.com/TestPlanB/p…