Do you care about throughput when using message queues?

Have you considered the factors that affect throughput?

Have you thought about how to improve?

Have you summarized best practices?

This article takes you to explore the implementation of the Go framework for high throughput on the consuming side of message queues. Let’s go!

Some thoughts on throughput

  • Write message queue throughput depends on two things

    • Network bandwidth
    • Write speed of message queues (such as Kafka)

    The optimal throughput is for one of them to be full, and in general the Intranet bandwidth is very high and unlikely to be full, so naturally the message queue write speed is full, and there are two points to balance

    • The size or number of bytes of messages written in batches
    • How long to delay writing

    Go-zero’s PeriodicalExecutor and ChunkExecutor are designed for this situation

  • The throughput of consuming messages from message queues depends on two aspects

    • The read speed of the message queue, and generally the read speed of the message queue itself, is fast enough compared to the speed at which messages are processed
    • Processing speed, which depends on the business

    The core problem here is reading too many messages into memory without considering business processing speed, which can cause two problems:

    • High memory usage, even OOM,podThere is amemory limit
    • stoppodWhen the accumulated messages are not processed enough, resulting in message loss

Solutions and implementations

To borrow a picture from Rob Pike, this is similar to queuing consumption. The four gopher on the left are fetched from the queue, and the four gopher on the right are fetched and processed. The ideal result is that the rates on the left and right are basically the same, no one wastes, no one waits, and there’s no stacking in the middle exchange.

Let’s look at how Go-Zero is implemented:

  • Producer
	for {
		select {
		case <-q.quit:
			logx.Info("Quitting producer")
			return
		default:
			if v, ok := q.produceOne(producer); ok {
				q.channel <- v
			}
		}
	}
Copy the code

There is no exit event and a message is read through produceOne and written to channel on success. Chan can be used to solve the problem of connection between reading and consumption.

  • Consumer
	for {
		select {
		case message, ok := <-q.channel:
			if ok {
				q.consumeOne(consumer, message)
			} else {
				logx.Info("Task channel was closed, quitting consumer...")
				return
			}
		case event := <-eventChan:
			consumer.OnEvent(event)
		}
	}
Copy the code

If ok is false, the channel is closed and you can exit the whole processing loop. We also support pause/ Resume on the Redis Queue, which we use a lot in social scenarios to tell consumers to pause and resume.

  • Start thequeueWith these we can controlproducer/consumerThe number of times to achieve throughput tuning
func (q *Queue) Start(a) {
	q.startProducers(q.producerCount)
	q.startConsumers(q.consumerCount)

	q.producerRoutineGroup.Wait()
	close(q.channel)
	q.consumerRoutineGroup.Wait()
}
Copy the code

The important thing to note here is that the producer should be stopped first and then wait for the consumer to finish processing the producer.

Here the core control code is basically finished, in fact, it looks very simple, you can also go to github.com/tal-tech/go… To see the full implementation.

use

Basic usage process:

  1. createproducerconsumer
  2. Start thequeue
  3. Production messages/consumption messages

Corresponds to queue, roughly as follows:

Create a queue

// The producer creates the factory
producer := newMockedProducer()
// Consumers create factories
consumer := newMockedConsumer()
// Pass the producer and consumer create factory function to NewQueue()
q := queue.NewQueue(func(a) (Producer, error) {
  return producer, nil
}, func(a) (Consumer, error) {
  return consumer, nil
})
Copy the code

Let’s see what parameters NewQueue takes:

  1. producerThe factory method
  2. consumerThe factory method

Pass the Producer & Consumer factory functions to the Queue, which creates them. The framework provides the Producer and Consumer interfaces and factory method definitions, and the control queue implementation of the entire process is automated.

productionmessage

We use a custom mockedProducer to simulate:

type mockedProducer struct {
	total int32
	count int32
  // Use waitGroup to simulate task completion
	wait  sync.WaitGroup
}
// Produce()
func (p *mockedProducer) Produce(a) (string.bool) {
	if atomic.AddInt32(&p.count, 1) <= p.total {
		p.wait.Done()
		return "item".true
	}
	time.Sleep(time.Second)
	return "".false
}
Copy the code

Producer writing in a queue must implement:

  • Produce(): The developer writes the logic that produces the message
  • AddListener(): Add eventslistener

consumptionmessage

We simulated this by defining a mockedConsumer:

type mockedConsumer struct {
	count  int32
}

func (c *mockedConsumer) Consume(string) error {
	atomic.AddInt32(&c.count, 1)
	return nil
}
Copy the code

Start thequeue

Start and then verify that the data transfer between the producer and consumer we described above is successful:

func main(a) {
	/ / create the queue
	q := NewQueue(func(a) (Producer, error) {
		return newMockedProducer(), nil
	}, func(a) (Consumer, error) {
		return newMockedConsumer(), nil
	})
  // Starting panic can also ensure that stop is executed to clean up resources
  defer q.Stop()
	/ / start
	q.Start()
}
Copy the code

This is the simplest implementation of queue. We implemented message queue services based on Redis and Kafka etc. through this core/ Queue framework, which has been fully tested in different business scenarios. You can also implement your own message queue services based on your business realities.

The overall design

The overall process is as shown in the figure above:

  1. All communications are made up ofchannelfor
  2. ProducerConsumerThe number can be set to match different business requirements
  3. ProduceConsumeThe implementation is defined by the developer,queueResponsible for the overall process

conclusion

This article showed you how to balance the speed of reading and processing messages from a queue through a channel, how to implement a generic message queue processing framework, and used mock examples to briefly show how to implement a message queue processing service based on core/ Queue. You can implement a message queue processing service based on RocketMQ et al in a similar way.

For more articles on the design and implementation of Go-Zero, you can follow the “Microservices Practice” public account.

The project address

Github.com/tal-tech/go…

Welcome to Go-Zero and star support us!

Wechat communication group

Pay attention to the “micro service practice” public account and click into the group to obtain the qr code of the community group.