Implement high-performance message callback middleware with 500 lines of Golang code

This article describes how to implement a message callback middleware, thanks to the golang pipeline and coroutine programming ideas, through clever design, with only about 500 lines of code to achieve high performance, elegant closing, automatic reconnection, etc. All code is also open source on Github/FishTrip/Watchman.

The problem

With the increase of service complexity and the increasing number of services after service splitting, it is necessary to introduce asynchronous message queues. When there are fewer services, such as in the early days of the business, many of which are a large single application or a few services, the Message Queue (later called MQ, Message Queue) is used as follows:

  • The sending side, directly connected to MQ, sends messages according to service requirements;
  • The consumer side, via a background process, connects to MQ over a long connection and then consumes messages in real time and processes them accordingly;

Relatively speaking, the sending side is relatively simple, the consumer side is more complex, need to deal with more logic. For example, sneakers, which we now use at our company, deals with the following logic:

  1. The consumer side requires a long connection that requires a separate process (possibly a separate thread in some languages) to consume messages in real time.
  2. After consuming the message, a business framework needs to be loaded (for example, Sneakers needs to join the Rails environment to execute the business code) to call the relevant code to consume the message;
  3. When MQ fails to connect, it needs to be automatically reconnected and the application needs to be gracefully restarted without message loss.
  4. Consuming messages may fail to be processed. In this case, a more secure and reliable mechanism is required to ensure that messages cannot be lost. At the same time, it is required to retry the messages after a period of time.

The system architecture at this time is generally as follows:

As the number of services increases, it’s not environmentally friendly to deploy a background process like this for every service that needs to consume messages:

  1. Adding one process per service increases deployment o&M costs;
  2. The management of queues (creation, destruction, binding) and message retry mechanism can easily lead to disstandardization if each service is responsible for it.
  3. If different services are in different languages and frameworks, and each language has to be implemented again, a lot of development resources will be wasted.

Is there a better way?

The general approach is to build a global, high-performance message callback middleware, the middleware is responsible for queue management, message sending and receiving, retry and error processing, so that each service does not need to consider such as message loss, message retry and other problems, basically solve the above shortcomings. What exactly should this message callback center do?

  1. Centrally manage the creation and message listening of all MQ queues;
  2. When a message is received, the middleware invokes the callback address of the relevant service. Because the callback center is responsible for all services, the middleware must be high-performance and highly concurrent.
  3. The middleware should have the function of message retry, and the message should not be lost when the message is retried.
  4. Middleware should have basic functions such as “reconnect” and “gracefully close” so that messages are not lost;

The framework is as follows:

In this way, the work of each service becomes much lighter. The goal of this article is to implement a version of message callback middleware available in production environments. Of course, our first version of the callback center does not need much functionality, with the following limitations:

  1. The retry process requires RabbitMQ built-in functionality, so RabbitMQ is only supported for now.
  2. Currently, only HTTP callback is supported.

With the basic requirements in place, how do you implement such a message callback middleware?

solution

Development language selection

Golang, as a “system-level development language,” is ideally suited to developing this type of middleware. The built-in Goroutine /channel mechanism is very easy to achieve high concurrency. As a new Golang player, this project is not complicated, so it is suitable for practicing and further learning.

Message reliability

What about retry and error handling? We’ve taken a page from Sneakers’ implementation, by using RabbitMQ’s built-in mechanism, the X-dead-letter mechanism, to ensure that messages can be retried reliably, as I wrote earlier in this article. A brief summary of the following ideas:

  1. When the message is processed normally, it is good to ack the message directly.
  2. Reject the message when an error occurs and a retry is required. In this case, the message is placed in a separate retry queue.
  3. The Retry queue is configured with a TTL timeout. When the TTL expires, messages are routed to Requeue Exchange (RabbitMQ).
  4. The message is re-queued to be retried.
  5. If the number of retries exceeds a certain value, the message is placed in an error queue for further processing;

There are two places that take advantage of RabbitMQ’s dead-letter mechanism:

  1. When a message is rejected, it is sent to the dead-letter Exchange of the queue, which is the retry queue.
  2. When a message on a queue is retried, after a timeout (the queue has set a TTL-expires period), the message enters the queue’s dead-letter Exchange, which is re-entered into the work queue.

Through this mechanism, messages are guaranteed not to be lost during message processing, whether normal or error. Refer to the article above for further details on this.

High concurrency

Middleware requires high performance, which also includes two aspects: low latency and high concurrency. Low latency is not something we can solve in this scenario because the delay after a message callback is determined by other business services. So we’re looking more for high concurrency.

How to get high concurrency? The first is the choice of development language. This kind of low-level middleware is suitable for Golang implementation. Why? Because the main logic of the callback center is to constantly call back services with latency times that middleware cannot control, it is best to use asynchronous events if you want high concurrency. With Golang’s built-in Channel, the performance of asynchronous events can be achieved, while making the whole development simple and efficient, which is a suitable choice.

What about the implementation? For a callback center, there are several steps:

  1. Get messages: connect to message queues (so far we only need to support RabbitMQ), consume messages;
  2. Callback business interface: After consuming the message, different queues may need to invoke different callback addresses according to the configuration information to invoke the business interface (currently we only need to support HTTP protocol);
  3. Process the message according to the result of the callback: If the invocation of the business interface is successful, the ack message will be used directly; If the call fails, reject the message; If the maximum number of retries is exceeded, the error handling logic is entered.
  4. Error processing logic: Ack the original message, and forward the message into the error queue, waiting for further processing (possibly alarm, and then manual processing);

A message is an “entity” that links all the above processes together, much like a pipeline. Pipeline design patterns are highly recommended by Golang for high concurrency. Each of the above steps can be viewed as a set of goroutines that communicate with each other through pipes, so there is no competition for resources, greatly reducing development costs.

Each of the above steps can be achieved by designing a different Goroutine model:

  1. Get messages: Long connections to RabbitMQ are required. A good way to do this is to have a separate set of coroutines for each queue so that messages can be received without interfering with each other, and there will be no delay in processing messages if there are busy or idle queues.
  2. Callback business interface: Each message invokes the business interface, but the processing duration of the business interface is transparent to the middleware. Therefore, the best model here is one coroutine per message. If a slow interface is present, goroutine’s internal scheduling mechanism does not affect the throughput of the system, and Goroutine can support millions of concurrent requests, so this mode is most suitable.
  3. Process the message based on the result of the callback: This step involves connecting RabbitMQ and sending an ACK /reject message. By default we assume RabbitMQ is reliable and will use the same set of coroutines.
  4. Error handling logic: The volume of messages here should be significantly reduced, because it takes multiple failed (more than retry times) messages to enter here. We can use the same set of coroutines.

In the above four steps, we used three coroutine design models to refine the above figure.

implementation

With the above design process, the code is not complex and is roughly divided into several parts: configuration management, main flow, message objects, retry logic, and implementation of graceful shutdown. The detailed code is available on Github: FishTrip/Watchman

Configuration management

Configuration management part, this version of our implementation is relatively simple, is to read the YML configuration file. The configuration file contains three parts of information:

  • Message queue definition. Call the RabbitMQ interface to generate queues (retry queues, error queues, etc.) according to the message queue configuration.
  • Callback address configuration. Different message queues require different callback addresses;
  • Other configurations. For example, retry times and timeout.
# config/queues.example.yml
projects:
  - name: test
    queues_default:
      notify_base: "http://localhost:8080"
      notify_timeout: 5
      retry_times: 40
      retry_duration: 300
      binding_exchange: fishtrip
    queues:
      - queue_name: "order_processor"
        notify_path: "/orders/notify" 
        routing_key:
          - "order.state.created"
          - "house.state.#"
Copy the code

The yaml. V2 package can be used to easily parse yamL configuration files into structs, such as queue definition, struct implementation as follows:

// config.go 28-38

type QueueConfig struct {
    QueueName       string   `yaml:"queue_name"`
    RoutingKey      []string `yaml:"routing_key"`
    NotifyPath      string   `yaml:"notify_path"`
    NotifyTimeout   int      `yaml:"notify_timeout"`
    RetryTimes      int      `yaml:"retry_times"`
    RetryDuration   int      `yaml:"retry_duration"`
    BindingExchange string   `yaml:"binding_exchange"`

    project *ProjectConfig
}
Copy the code

The main reason we need a pointer to ProjectConfig above is to make it easy to read the configuration of Project, so we need to point the queue to Project when loading.

// config.go func loadQueuesConfig(configFileName string, allQueues []*QueueConfig) []*QueueConfig { // ...... projects := projectsConfig.Projects for i, project := range projects { log.Printf("find project: Queue := project.Queues := projects[I].Queues for j, queue := range queues { log.Printf("find queue: Queues [j].project = projects[I].project = projects[I].queues = queues ().queues [j].projects = projects[I].queues = queues () &queues[j]) } } // ....... }Copy the code

One error-prone part of the code above is that you cannot use the queue variable when setting Pointers inside the for loop, because the queue variable is a copy of the data, not the original.

In addition, much of the logic in config.go is written in an object-oriented way of thinking, such as:

// config.go
func (qc QueueConfig) ErrorQueueName() string {
    return fmt.Sprintf("%s-error", qc.QueueName)
}
func (qc QueueConfig) WorkerExchangeName() string {
    if qc.BindingExchange == "" {
        return qc.project.QueuesDefaultConfig.BindingExchange
    }
    return qc.BindingExchange
}
Copy the code

This way, you can write cleaner, maintainable code.

Message object encapsulation

Throughout the program, the data passed through the channel is the Message object. This object encapsulation makes it very easy to pass data between various types of Goroutine.

The Message class is defined as follows:

AmqpDelivery *amqp.Delivery // RabbitMQ notifyResponse NotifyResponse // Message callback result}Copy the code

We pass Message objects between pipes by encapsulating native messages in RabbitMQ with queue information and callback results. Message also encapsulates a number of methods that other coroutines can easily call.

Func (m Message) CurrentMessageRetries() int {} func (m *Message) Notify(client * http.client) *Message {} func (m Message) IsMaxRetry() bool {} func (m Message) IsNotifySuccess() bool {} func (m Message) Ack() error {} func  (m Message) Reject() error {} func (m Message) Republish(out chan<- Message) error {} func (m Message) CloneAndPublish(channel *amqp.Channel) error {}Copy the code

Note the receive object of the above method. The receive object with a pointer indicates that the value of the object is modified.

Main process

The main process is what we said above, through the pattern of pipeline, the whole process of the message in series. The core code is here:

// main.go
<-resendMessage(ackMessage(workMessage(receiveMessage(allQueues, done))))
Copy the code

Each of the above functions receives the same pipe definition and can therefore be used in series. In fact, the implementation of each function is not very different, and different coroutine models may require different writing.

ReceiveMessage is written and commented in detail below. RevceiveMessage generates N coroutines for each message queue and writes all the messages read to the pipe.

Queue []*QueueConfig, done <-chan struct{}) <-chan Message {// Create a queue. Out := make(chan Message, ChannelBufferLength) // WaitGroup for synchronization, Var wg sync.waitgroup // The input parameter is a queue configuration, Receiver := func(qc QueueConfig) {defer wg.done () // RECONNECT: for { _, channel, err := setupChannel() if err ! // Consume(qc.workerqueuename (), // queue "", // Consume false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) PanicOnError(err) for { select { case msg, ok := <-msgs: if ! ok { log.Printf("receiver: Channel is closed, maybe lost connection") time.sleep (5 * time.second) continue RECONNECT} MSG.MessageId = fmt.sprintf ("%s", uuid.newv4 ()) message := message {qc, & MSG, Out < -message message.Printf("receiver: received MSG ") case <-done: // The main coroutine exits log.Printf("receiver: received a done signal") return } } } } for _, queue := range queues { wg.Add(ReceiverNum) for i := 0; i < ReceiverNum; I++ {// each queue generates N coroutines for common consumption go receiver(*queue)}} // control coroutines, when all consumption coroutines exit, exit pipe also needs to be closed, Notify the downstream coroutine to go func() {wg.wait () log.printf (" All receiver is done, closing channel") close(out)}() return out}Copy the code

There are a few key points to note.

  1. Each function is a similar structure, a set of worker coroutines and cooperative coroutines, and when all working coroutines exit, the exit pipe is closed, notifying the downstream coroutines. Note that in Golang, for pipe use, you need to close it from the write side, otherwise it’s easy to crash.
  2. In each message, we record a unique UUID, which is used to log and track the entire flow of information.
  3. Because of the possible network condition, we need to judge, if there is a connection failure, directly sleep for a period of time, and then reconnect.
  4. Done The pipe is controlled in the main coroutine and is used primarily for graceful closing. Graceful shutdown is used to ensure that no messages are lost while upgrading the configuration or the main program (the coroutine is not terminated until the message is actually complete, and the entire program exits).

conclusion

Thanks to Golang’s efficient presentation, a stable message callback middleware is implemented in about 500 lines of code with the following features:

  • High performance. A simple test on macbook Pro 15 shows that the processing capacity of each queue can easily reach 3000 messages /second or more. Multiple queues can also achieve linear performance increase, and the overall application can easily reach tens of thousands of messages per second. Also, thanks to Golang’s coroutine design, concurrency is not affected if a slow call occurs downstream.
  • Gracefully closed. By listening for signals, the entire program can be gracefully shut down without losing messages, facilitating configuration changes and program restarts. This is very important in a production environment.
  • Automatic reconnection. When the RabbitMQ service fails to connect, the application can be automatically reconnected.

Our team
github