The introduction
PubSub(Publish/Subscribe) mode, which means “Publish/Subscribe” mode, is designed to solve the one-to-many dependency relationship, so that multiple consumers listen to a topic at the same time, which can decouple not only producers and consumers, but also different consumers from each other (note: Antipatterns that rely on the order in which the subscriber executes, using shared data to pass state, need to be avoided because consumers are coupled and cannot change independently. The key to this is the need for an intermediary to maintain the subscription and deliver the produced messages to the subscribers.
In Golang, channel is a natural medium. Let’s implement EventBus, a tool class, step by step based on PubSub pattern.
Define the type of
First, let’s define some basic types and core operations.
//EventID is the unique identifier of an Event
type EventID int64
//Event
type Event interface {
ID() EventID
}
//EventHandler
type EventHandler interface {
OnEvent(ctx context.Context, event Event) error
CanAutoRetry(err error) bool
}
// JobStatus holds information related to a job status
type JobStatus struct {
RunAt time.Time
FinishedAt time.Time
Err error
}
//EventBus ...
type EventBus struct {}
func (eb *EventBus) Subscribe(eventID EventID, handlers ... EventHandler){}func (eb *EventBus) Unsubscribe(eventID EventID, handlers ... EventHandler){}func (eb *EventBus) Publish(evt Event) <-chan JobStatus{}Copy the code
The key and dismantling
The handlers map[EventID][]EventHandler is used to maintain the service. The handlers map[EventID][]EventHandler is used to maintain the service. I still need to add a lock.
//Subscribe ...
func (eb *EventBus) Subscribe(eventID EventID, handlers ... EventHandler) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.handlers[eventID] = append(eb.handlers[eventID], handlers...)
}
Copy the code
The implementation here is relatively simple, without considering a consumer, the problem of repeated subscription, left to the user to deal with their own. (But why would the same consumer call subcribe multiple times and subscribe to the same topic? It feels like writing a bug.)
Here comes the core Publish function, which must have a channel(preferably a buffer) to pass the Event data, and some resident coroutines to listen for messages and start relevant consumers for performance purposes. Here is the relevant code (in the full version of the code, added logging, error handling, etc., but left out for emphasis)
func (eb *EventBus) Start(a) {
if eb.started {
return
}
for i := 0; i < eb.eventWorkers; i++ {
eb.wg.Add(1)
go eb.eventWorker(eb.eventJobQueue)
}
eb.started = true
}
func (eb *EventBus) eventWorker(jobQueue <-chan EventJob) {
loop:
for {
select {
case job := <-jobQueue:
jobStatus := JobStatus{
RunAt: time.Now(),
}
ctx, cancel := context.WithTimeout(context.Background(), eb.timeout)
g, _ := errgroup.WithContext(ctx)
for index := range job.handlers {
handler := job.handlers[index]
g.Go(func(a) error {
return eb.runHandler(ctx, handler, job.event)
})
}
jobStatus.Err = g.Wait()
jobStatus.FinishedAt = time.Now()
select {
case job.resultChan <- jobStatus:
default:
}
cancel()
}
}
}
Copy the code
Now that you’ve done the above, here’s the actual Publish code.
// EventJob ...
type EventJob struct {
event Event
handlers []EventHandler
resultChan chan JobStatus
}
//Publish ...
func (eb *EventBus) Publish(evt Event) <-chan JobStatus {
eb.mu.RLock()
defer eb.mu.RUnlock()
if ehs, ok := eb.handlers[evt.ID()]; ok {
handlers := make([]EventHandler, len(ehs))
copy(handlers, ehs) // Snapshot a copy of the consumer at that time
job := EventJob{
event: evt,
handlers: handlers,
resultChan: make(chan JobStatus, 1),}var jobQueue = eb.eventJobQueue
select {
case jobQueue <- job:
default:}return job.resultChan
} else {
err := fmt.Errorf("no handlers for event(%d)", evt.ID())
resultChan := make(chan JobStatus, 1)
resultChan <- JobStatus{
Err: err,
}
return resultChan
}
}
Copy the code
The relevant consumers are not fetched directly from the Handlers according to their IDS in the eventWorker, partly to make the eventWorker more generic and partly to reduce the blocking caused by locking operations.
At this point, we’ve broken down the core code one by one. For the complete code, see event_bus.go in the ChannelX project
Use the sample
No utility class is complete without an example, and here’s one.
Define an event, where the ID is defined as private, and then enforce it in the constructor.
const ExampleEventID channelx.EventID = 1
type ExampleEvent struct {
id channelx.EventID
}
func NewExampleEvent(a) ExampleEvent {
return ExampleEvent{id:ExampleEventID}
}
func (evt ExampleEvent) ID(a) channelx.EventID {
return evt.id
}
Copy the code
Next comes the Event Handler, which checks in OnEvent to see if the received event is a subscribed event and if the received event structure can be converted to a specific type, as required. After defense programming, you can handle the event logic.
type ExampleHandler struct {
logger channelx.Logger
}
func NewExampleHandler(logger channelx.Logger) *ExampleHandler {
return &ExampleHandler{
logger: logger,
}
}
func (h ExampleHandler) Logger(a) channelx.Logger{
return h.logger
}
func (h ExampleHandler) CanAutoRetry(err error) bool {
return false
}
func (h ExampleHandler) OnEvent(ctx context.Context, event channelx.Event) error {
ifevent.ID() ! = ExampleEventID {return fmt.Errorf("subscribe wrong event(%d)", event.ID())
}
_, ok := event.(ExampleEvent)
if! ok {return fmt.Errorf("failed to convert received event to ExampleEvent")}// handle the event here
h.Logger().Infof("event handled")
return nil
}
Copy the code
Finally, it’s time to start EventBus, subscribe to events, and publish.
eventBus := channelx.NewEventBus(logger, "test".4.4.2, time.Second, 5 * time.Second)
eventBus.Start()
handler := NewExampleHandler(logger)
eventBus.Subscribe(ExampleEventID, handler)
eventBus.Publish(NewExampleEvent())
Copy the code
Write in the last
I’ve written before about the use of channels,
- How to make Golang’s channel as silky as NodeJS’s stream
- How to use Golang channel to achieve message batch processing
- How to play Golang Channel out of async and await feel
- Next time you want to write concurrent processing in Golang, use this template.
The lightweight util implementation is open source in channelx, welcome to review, if you like to use the tool, please click a like or star 🙂