This example is a Twitter-like Web application that uses Server-Sent Events to support real-time refreshing.

run

docker-compose up
Copy the code

Then, visit http://localhost:8080

You can add your own posts or click the button to get randomly generated posts.

Either way, the list of feeds and posts in the feed should be up to date. Try using a second browser window to view the update.

How does it work

  • Posts can be created and updated.
  • Posts can contain tags.
  • Each TAB has its own feed that contains all the posts from that TAB.
  • All posts are stored in MySQL. This is the write model.
  • All feeds are updated asynchronously and stored in MongoDB. This is the read model.

Why use separate write and read models?

For this example application, using multilingual persistence (two database engines) is certainly overkill. We did this to demonstrate the technology and how easily it can be applied to the Watermill.

A dedicated read model is a useful pattern for applications with high read/write ratios. All writes are applied atomically to the write model (in our case, MySQL). Event handlers update the read model asynchronously (we used Mongo).

The data in the read model can be used as-is. You can also extend it independently of the write model.

Keep in mind that to use this pattern, final consistency must be accepted in the application. And, in most use cases, you probably don’t need to use it. Pragmatic!

SSE Router

SSERouter comes from watermill-HTTP. When creating a new router, you need to pass an upstream subscriber. A message from the subscriber triggers an update push over HTTP.

In this case, we use NATS as Pub/Sub, but this can be any Pub/Sub Watermill supports.

sseRouter, err := watermillHTTP.NewSSERouter(
    watermillHTTP.SSERouterConfig{
        UpstreamSubscriber: router.Subscriber,
        ErrorHandler:       watermillHTTP.DefaultErrorHandler,
    },
    router.Logger,
)
Copy the code

Stream Adapters

To use SSERouter, you need to prepare a StreamAdapter with two methods.

GetResponse is similar to a standard HTTP handler. Modifying an existing handler to match this signature should be very easy.

Validate is an additional method that tells us whether updates should be pushed for a particular Message.

type StreamAdapter interface {
	// GetResponse returns the response to be sent back to client.
	// Any errors that occur should be handled and written to `w`, returning false as `ok`.
	GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
	// Validate validates if the incoming message should be handled by this handler.
	// Typically this involves checking some kind of model ID.
	Validate(r *http.Request, msg *message.Message) (ok bool)}Copy the code

The Validate example is shown below. It checks whether the message came from the same POST ID as the user sent through the HTTP request.

func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
	postUpdated := PostUpdated{}

	err := json.Unmarshal(msg.Payload, &postUpdated)
	iferr ! =nil {
		return false
	}

	postID := chi.URLParam(r, "id")

	return postUpdated.OriginalPost.ID == postID
}
Copy the code

If you want to trigger an update for each message, you can simply return true.

func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
	return true
}
Copy the code

Before you can start SSERouter, you need to add handlers with specific themes. AddHandler returns a standard HTTP handler that can be used in any routing library.

postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)

// ...

r.Get("/posts/{id}", postHandler)
Copy the code

Event Handlers

This example uses Watermill for all asynchronous communication, including SSE.

The following events were posted:

  • PostCreated
    • Add POST to all feeds that contain labels in the post.
  • FeedUpdated
    • Updates are pushed to all clients currently accessing the feed page.
  • PostUpdated
    • Updates are pushed to all clients currently accessing the POST page.
    • Update posts in all feeds with the tags that exist in the posts
      • A) For existing tags, the post content will be updated in the tag.
      • B) If a new tag is added, the article will be added to the feed of the tag.
      • C) If the tag has been deleted, the post will be deleted from the feed of the tag.

The front-end app

The front-end application is built using vue.js and Bootstrap.

The most interesting part is the use of EventSource.

this.es = new EventSource('/api/feeds/' + this.feed)

this.es.addEventListener('data'.event= > {
    let data = JSON.parse(event.data);
    this.posts_stream = data.posts;
}, false);
Copy the code

Refs

  • watermill.io

I am for less. Wechat: uuhells123. Public account: Hacker afternoon tea.

Thank you for your support 👍👍👍!