When we were developing the project, we needed to introduce the event bus in order to facilitate communication between components and make the code more concise and less coupled. We usually choose EventBus or RxBus based on Rxjava. Now with the launch of LiveData in jetpack, there are also LiveData based EventBus libraries. So, are there any other ways to implement an event bus? In the process of using coroutines, it is found that the Channel in coroutines uses the producer consumer pattern. Can we use the Channel to implement the event bus? So let’s try it.
Channel
Channel is a kotlin coroutine, and to use it you need to introduce a coroutine library into your project
implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines -- core: 1.3.3." "
implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines - android: 1.3.3." "
Copy the code
A Channel is similar to BlockingQueue in Java, where a producer produces events and sends them to a Channel, and a consumer takes events from the Channel and consumes them.
The kotlin official documentation provides the use of Channel
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1.. 5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
Copy the code
Create a Channel, use channel. send to send an event, and use channel. receive to receive an event.
Implement the event bus using channels
- Start by creating the ChannelBus singleton class
class ChannelBus private constructor() {
private var channel: Channel<Events> = Channel(Channel.BUFFERED)
companion object {
val instance: ChannelBus by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
ChannelBus()
}
}
}
Copy the code
- Then create a data class that wraps the consumer. Among them
context
Represents the context in which the coroutine is executed;event
Is a suspended method for consuming events;jobList
The job used to save the consumption event.
data class ChannelConsumer(
val context: CoroutineContext,
val event: suspend (event: Events) -> Unit.var jobList: MutableList<Job> = mutableListOf()
)
Copy the code
- The consumer. We create a Map to save the consumer in this
receive
Method, where key represents the key in which the consumer is stored, onEvent is a suspended function that represents what we do when we receive the event, is a lambda function, and context is a coroutine context that represents the thread in which the lambda function is executed, where the default isDispatchers.Main
Main thread execution. We construct the passed argument into oneChannelConsumer
The object is then saved in the Map.
private val consumerMap = ConcurrentHashMap<String, ChannelConsumer>()
fun receive(
key: String,
context: CoroutineContext = Dispatchers.Main,
onEvent: suspend (event: Events) -> Unit
) {
consumerMap[key] = ChannelConsumer(context, onEvent)
}
Copy the code
- Send events. Calling this method sends the event, passing the incoming event through the coroutine
Channel
Send it out.
fun send(event: Events) {
GlobalScope.launch {
channel.send(event)
}
}
Copy the code
- Consumption events. It’s a producer-consumer model when
Channel
Hangs when it is null, and when a new event is available, the event is suspended fromChannel
And we distribute it here. We go through the Map, we get eachChannelConsumer
“, so you can handle eventse
It goes straight through herelaunch
Method starts the coroutine, the context of the coroutineit.value.context
isreceive
Method passed incontext
.it.value.event(e)
isreceive
Method passes in the lambda function,e
issend
Method passed inevent
.launch
Method returns ajob
, let’s add the job toChannelConsumer
thejobList
In the water.
init {
GlobalScope.launch {
for (e in channel) {
consumerMap.entries.forEach {
it.value.jobList.add(launch(it.value.context) {
it.value.event(e)
})
}
}
}
}
Copy the code
- Remove the consumer when you finally unsubscribe. In the remove method, we get it in the Map by passing in the key
ChannelConsumer
And then loopjobList
Cancel every job, avoid memory leaks, and finally remove consumers.
fun remove(key: String){ consumerMap[key]? .jobList? .forEach { it.cancel() } consumerMap.remove(key) }Copy the code
Method of use
So we can use that in our project
- Register event consumers. Because it’s in the main thread by default, you can do UI operations directly.
override fun onCreate(savedInstanceState: Bundle?).{... ChannelBus.instance.receive("key",Dispatchers.Main,{
activity_main_text.text = it.name
})
......
}
Copy the code
It can be shortened as follows
override fun onCreate(savedInstanceState: Bundle?).{... ChannelBus.instance.receive("key") {
activity_main_text.text = it.name
}
......
}
Copy the code
You pass context to the IO thread dispatchers. IO, and then use the withContext function to cut back to the main thread and perform UI operations.
override fun onCreate(savedInstanceState: Bundle?).{... ChannelBus.instance.receive("key", Dispatchers.IO) {
val s = httpRequest() //IO thread, time consuming operation
withContext(Dispatchers.Main) { // Cut back to the UI thread
activity_sticky_text.text = s / / change the UI}}}// Network request
private fun httpRequest(a): String {
val url = URL("https://api.github.com/users/LGD2009")
val urlConnection = url.openConnection() as HttpURLConnection
urlConnection.let {
it.connectTimeout = 5000
it.requestMethod = "GET"
}
urlConnection.connect()
if(urlConnection.responseCode ! =200) {
return "Failed to request URL"
} else {
val inputStream: InputStream = urlConnection.inputStream
return inputStream.bufferedReader().use { it.readText() }
}
}
Copy the code
- Send the event
ChannelBus.instance.send(Events.EVENT_1)
Copy the code
- Unsubscribe at last
override fun onDestroy(a){... ChannelBus.instance.remove("key")}Copy the code
Automatically cancelled
In the above method, each time after the registration of consumers need to manually cancel, so can automatically cancel? Lifecycle is needed here. We made ChannelBus inherit LifecycleObserver and override the receive and remove methods. Overloading the receive method, of which the key switch to LifecycleOwner, then call LifecycleOwner. Lifecycle. The addObserver (this) will be added to the current ChannelBus as observer.
class ChannelBus private constructor() : LifecycleObserver {
private val lifecycleOwnerMap = ConcurrentHashMap<LifecycleOwner, ChannelConsumer>()
......
fun receive(
lifecycleOwner: LifecycleOwner,
context: CoroutineContext = Dispatchers.Main,
onEvent: suspend (event: Events) -> Unit
) {
lifecycleOwner.lifecycle.addObserver(this)
lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent)
}
}
Copy the code
Then override the remove method, change the key to LifecycleOwner, and add annotations. Because both the Activity and Fragment now inherit LifecycleOwner, when the Activity and Fragment run Destroy destruction, the current observer will observe and call this method.
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun remove(lifecycleOwner: LifecycleOwner){ lifecycleOwnerMap[lifecycleOwner]? .jobList? .forEach { it.cancel() } lifecycleOwnerMap.remove(lifecycleOwner) }Copy the code
So, we only need to register consumers in an Activity or Fragment
override fun onCreate(savedInstanceState: Bundle?).{... ChannelBus.instance.receive(this) {
activity_main_text.text = it.name
}
......
}
Copy the code
Automatically unregister when an Activity or Fragment is destroyed.
Viscous event
Sometimes we need to be able to receive previously sent events when the consumer subscribes, and that’s where stickiness comes in. The simple implementation idea is to save the event and send it when registering the consumer. Create a List to hold sticky events and add methods to remove sticky events.
private val stickyEventsList = mutableListOf<Events>()
fun removeStickEvent(event: Events) {
stickyEventsList.remove(event)
}
Copy the code
Modify the send method to add a Boolean parameter that specifies whether it is sticky or not. Of course, the default is false. If true, the event is stored in the List.
fun send(event: Events, isSticky: Boolean = false) {
GlobalScope.launch {
if (isSticky) {
stickyEventsList.add(event)
}
channel.send(event)
}
}
Copy the code
Add consumer methods that receive sticky events.
fun receiveSticky(
lifecycleOwner: LifecycleOwner,
context: CoroutineContext = Dispatchers.Main,
onEvent: suspend (event: Events) -> Unit
) {
lifecycleOwner.lifecycle.addObserver(this) lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent) stickyEventsList.forEach { e -> lifecycleOwnerMap[lifecycleOwner]? .jobList? .add(GlobalScope.launch(context) { onEvent(e) }) } }Copy the code
BroadcastChannel
In the above article, the same event can only be fetched once, so in order to send it to multiple consumers, Map is used to save it and then send it in sequence. A BroadcastChannel can have multiple receivers.
Therefore, it is much simpler to use BroadcastChannel. Create a BroadcastChannel object
@ExperimentalCoroutinesApi
class BroadcastChannelBus private constructor() : LifecycleObserver {
private val broadcastChannel: BroadcastChannel<Events> = BroadcastChannel(Channel.BUFFERED)
private val lifecycleOwnerMap = ConcurrentHashMap<LifecycleOwner, ChannelConsumer>()
companion object {
val instance: BroadcastChannelBus by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
BroadcastChannelBus()
}
}
Copy the code
Change the data class jobList to Job and add a receiveChannel
data class ChannelConsumer(
val context: CoroutineContext,
val event: suspend (event: Events) -> Unit.valjob: Job? .val receiveChannel: ReceiveChannel<Events>
)
Copy the code
The send method does not need to change, the receive method needs to change. Through the val receiveChannel = broadcastChannel. OpenSubscription () subscriptions, job and receiveChannel data saved to the class.
fun receive(
lifecycleOwner: LifecycleOwner,
context: CoroutineContext = Dispatchers.Main,
onEvent: suspend (event: Events) -> Unit
) {
lifecycleOwner.lifecycle.addObserver(this)
val receiveChannel = broadcastChannel.openSubscription()
val job = GlobalScope.launch(context) {
for (e in receiveChannel) {
onEvent(e)
}
}
lifecycleOwnerMap[lifecycleOwner] = ChannelConsumer(context, onEvent,job,receiveChannel)
}
Copy the code
So, when you finally unsubscribe, close receiveChannel and cancel the task.
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun remove(lifecycleOwner: LifecycleOwner){ lifecycleOwnerMap[lifecycleOwner]? .receiveChannel? .cancel() lifecycleOwnerMap[lifecycleOwner]? .job? .cancel() lifecycleOwnerMap.remove(lifecycleOwner) }Copy the code
However, it is important to note that this API is currently experimental and may change in future releases.
conclusion
This article mainly introduces the idea of using Channel to realize event bus. The Demo has been uploaded to Github. The project name is ChannelBus. You can modify it according to your specific needs when using it.
kotlin Channel
Crack Kotlin coroutine (9) -channel
Implementing an Event Bus With RxJava – RxBus