When we were developing the project, we needed to introduce the event bus in order to facilitate communication between components and make the code more concise and less coupled. We usually choose EventBus or RxBus based on Rxjava. Now with the launch of LiveData in jetpack, there are also LiveData based EventBus libraries. So, are there any other ways to implement an event bus? In the process of using coroutines, it is found that the Channel in coroutines uses the producer consumer pattern. Can we use the Channel to implement the event bus? So let’s try it.

Channel

Channel is a kotlin coroutine, and to use it you need to introduce a coroutine library into your project

implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines -- core: 1.3.3." "
implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines - android: 1.3.3." "
Copy the code

A Channel is similar to BlockingQueue in Java, where a producer produces events and sends them to a Channel, and a consumer takes events from the Channel and consumes them.

The kotlin official documentation provides the use of Channel

val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1.. 5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")

Copy the code

Create a Channel, use channel. send to send an event, and use channel. receive to receive an event.

Implement the event bus using channels

  1. Start by creating the ChannelBus singleton class
	class ChannelBus private constructor() {
	
	    private var channel: Channel<Events> = Channel(Channel.BUFFERED)
	    
	    companion object {
	        val instance: ChannelBus by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
	            ChannelBus()
	        }
	    }
	}
Copy the code
  1. Then create a data class that wraps the consumer. Among themcontextRepresents the context in which the coroutine is executed;eventIs a suspended method for consuming events;jobListThe job used to save the consumption event.
	data class ChannelConsumer(
	    val context: CoroutineContext,
	    val event: suspend (event: Events) -> Unit.var jobList: MutableList<Job> = mutableListOf()
	)
Copy the code
  1. The consumer. We create a Map to save the consumer in thisreceiveMethod, where key represents the key in which the consumer is stored, onEvent is a suspended function that represents what we do when we receive the event, is a lambda function, and context is a coroutine context that represents the thread in which the lambda function is executed, where the default isDispatchers.MainMain thread execution. We construct the passed argument into oneChannelConsumerThe object is then saved in the Map.
private val consumerMap = ConcurrentHashMap<String, ChannelConsumer>()

	fun receive(
	    key: String,
	    context: CoroutineContext = Dispatchers.Main,
	    onEvent: suspend (event: Events) -> Unit
	) {
	    consumerMap[key] = ChannelConsumer(context, onEvent)
	}
Copy the code
  1. Send events. Calling this method sends the event, passing the incoming event through the coroutineChannelSend it out.
	fun send(event: Events) {
	     GlobalScope.launch {
	         channel.send(event)
	     }
	}
Copy the code
  1. Consumption events. It’s a producer-consumer model whenChannelHangs when it is null, and when a new event is available, the event is suspended fromChannelAnd we distribute it here. We go through the Map, we get eachChannelConsumer“, so you can handle eventseIt goes straight through herelaunchMethod starts the coroutine, the context of the coroutineit.value.contextisreceiveMethod passed incontext.it.value.event(e)isreceiveMethod passes in the lambda function,eissendMethod passed inevent.launchMethod returns ajob, let’s add the job toChannelConsumerthejobListIn the water.
	init {
	    GlobalScope.launch {
	        for (e in channel) {
	            consumerMap.entries.forEach {
                    it.value.jobList.add(launch(it.value.context) {
                        it.value.event(e)
                    })
	            }
	        }
	    }
	}
Copy the code
  1. Remove the consumer when you finally unsubscribe. In the remove method, we get it in the Map by passing in the keyChannelConsumerAnd then loopjobListCancel every job, avoid memory leaks, and finally remove consumers.
    fun remove(key: String){ consumerMap[key]? .jobList? .forEach { it.cancel() } consumerMap.remove(key) }Copy the code
Method of use

So we can use that in our project

  1. Register event consumers. Because it’s in the main thread by default, you can do UI operations directly.
     override fun onCreate(savedInstanceState: Bundle?).{... ChannelBus.instance.receive("key",Dispatchers.Main,{
            activity_main_text.text = it.name
        }) 
		......
    }
Copy the code

It can be shortened as follows

     override fun onCreate(savedInstanceState: Bundle?).{... ChannelBus.instance.receive("key") {
            activity_main_text.text = it.name
        }
		......
    }
Copy the code

You pass context to the IO thread dispatchers. IO, and then use the withContext function to cut back to the main thread and perform UI operations.

    override fun onCreate(savedInstanceState: Bundle?).{... ChannelBus.instance.receive("key", Dispatchers.IO) {
            val s = httpRequest()	//IO thread, time consuming operation
            withContext(Dispatchers.Main) {	// Cut back to the UI thread
                activity_sticky_text.text = s	/ / change the UI}}}// Network request
    private fun httpRequest(a): String {
        val url = URL("https://api.github.com/users/LGD2009")
        val urlConnection = url.openConnection() as HttpURLConnection
        urlConnection.let {
            it.connectTimeout = 5000
            it.requestMethod = "GET"
        }
        urlConnection.connect()
        if(urlConnection.responseCode ! =200) {
            return "Failed to request URL"
        } else {
            val inputStream: InputStream = urlConnection.inputStream
            return inputStream.bufferedReader().use { it.readText() }
        }
    }
Copy the code
  1. Send the event
 ChannelBus.instance.send(Events.EVENT_1)
Copy the code
  1. Unsubscribe at last
   override fun onDestroy(a){... ChannelBus.instance.remove("key")}Copy the code

Automatically cancelled

In the above method, each time after the registration of consumers need to manually cancel, so can automatically cancel? Lifecycle is needed here. We made ChannelBus inherit LifecycleObserver and override the receive and remove methods. Overloading the receive method, of which the key switch to LifecycleOwner, then call LifecycleOwner. Lifecycle. The addObserver (this) will be added to the current ChannelBus as observer.

class ChannelBus private constructor() : LifecycleObserver {

    private val lifecycleOwnerMap = ConcurrentHashMap<LifecycleOwner, ChannelConsumer>()
    
	......
	
    fun receive(
        lifecycleOwner: LifecycleOwner,
        context: CoroutineContext = Dispatchers.Main,
        onEvent: suspend (event: Events) -> Unit
    ) {
        lifecycleOwner.lifecycle.addObserver(this)
        lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent)
    }

}
Copy the code

Then override the remove method, change the key to LifecycleOwner, and add annotations. Because both the Activity and Fragment now inherit LifecycleOwner, when the Activity and Fragment run Destroy destruction, the current observer will observe and call this method.

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun remove(lifecycleOwner: LifecycleOwner){ lifecycleOwnerMap[lifecycleOwner]? .jobList? .forEach { it.cancel() } lifecycleOwnerMap.remove(lifecycleOwner) }Copy the code

So, we only need to register consumers in an Activity or Fragment

     override fun onCreate(savedInstanceState: Bundle?).{... ChannelBus.instance.receive(this) {
            activity_main_text.text = it.name
        }
		......
    }
Copy the code

Automatically unregister when an Activity or Fragment is destroyed.

Viscous event

Sometimes we need to be able to receive previously sent events when the consumer subscribes, and that’s where stickiness comes in. The simple implementation idea is to save the event and send it when registering the consumer. Create a List to hold sticky events and add methods to remove sticky events.

 	private val stickyEventsList = mutableListOf<Events>()
 
    fun removeStickEvent(event: Events) {
        stickyEventsList.remove(event)
    }
Copy the code

Modify the send method to add a Boolean parameter that specifies whether it is sticky or not. Of course, the default is false. If true, the event is stored in the List.

    fun send(event: Events, isSticky: Boolean = false) {
        GlobalScope.launch {
            if (isSticky) {
                stickyEventsList.add(event)
            }
            channel.send(event)
        }
    }
Copy the code

Add consumer methods that receive sticky events.

    fun receiveSticky(
        lifecycleOwner: LifecycleOwner,
        context: CoroutineContext = Dispatchers.Main,
        onEvent: suspend (event: Events) -> Unit
    ) {
        lifecycleOwner.lifecycle.addObserver(this) lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent) stickyEventsList.forEach { e -> lifecycleOwnerMap[lifecycleOwner]? .jobList? .add(GlobalScope.launch(context) { onEvent(e) }) } }Copy the code

BroadcastChannel

In the above article, the same event can only be fetched once, so in order to send it to multiple consumers, Map is used to save it and then send it in sequence. A BroadcastChannel can have multiple receivers.

Therefore, it is much simpler to use BroadcastChannel. Create a BroadcastChannel object

@ExperimentalCoroutinesApi
class BroadcastChannelBus private constructor() : LifecycleObserver {

    private val broadcastChannel: BroadcastChannel<Events> = BroadcastChannel(Channel.BUFFERED)
    private val lifecycleOwnerMap = ConcurrentHashMap<LifecycleOwner, ChannelConsumer>()

    companion object {
        val instance: BroadcastChannelBus by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
            BroadcastChannelBus()
        }
    }
Copy the code

Change the data class jobList to Job and add a receiveChannel

    data class ChannelConsumer(
        val context: CoroutineContext,
        val event: suspend (event: Events) -> Unit.valjob: Job? .val receiveChannel: ReceiveChannel<Events>
    )
Copy the code

The send method does not need to change, the receive method needs to change. Through the val receiveChannel = broadcastChannel. OpenSubscription () subscriptions, job and receiveChannel data saved to the class.

    fun receive(
        lifecycleOwner: LifecycleOwner,
        context: CoroutineContext = Dispatchers.Main,
        onEvent: suspend (event: Events) -> Unit
    ) {
        lifecycleOwner.lifecycle.addObserver(this)
        val receiveChannel = broadcastChannel.openSubscription()
        val job = GlobalScope.launch(context) {
            for (e in receiveChannel) {
                onEvent(e)
            }
        }
        lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent,job,receiveChannel)
    }
Copy the code

So, when you finally unsubscribe, close receiveChannel and cancel the task.

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun remove(lifecycleOwner: LifecycleOwner){ lifecycleOwnerMap[lifecycleOwner]? .receiveChannel? .cancel() lifecycleOwnerMap[lifecycleOwner]? .job? .cancel() lifecycleOwnerMap.remove(lifecycleOwner) }Copy the code

However, it is important to note that this API is currently experimental and may change in future releases.

conclusion

This article mainly introduces the idea of using Channel to realize event bus. The Demo has been uploaded to Github. The project name is ChannelBus. You can modify it according to your specific needs when using it.

kotlin Channel

Crack Kotlin coroutine (9) -channel

Implementing an Event Bus With RxJava – RxBus