Do you care about throughput when using message queues?
Have you considered the factors that affect throughput?
Have you thought about how to improve?
Have you summarized best practices?
This article takes you to explore the implementation of the Go framework for high throughput on the consuming side of message queues. Let’s go!
Some thoughts on throughput
-
Write message queue throughput depends on two things
- Network bandwidth
- Write speed of message queues (such as Kafka)
The optimal throughput is for one of them to be full, and in general the Intranet bandwidth is very high and unlikely to be full, so naturally the message queue write speed is full, and there are two points to balance
- The size or number of bytes of messages written in batches
- How long to delay writing
Go-zero’s PeriodicalExecutor and ChunkExecutor are designed for this situation
-
The throughput of consuming messages from message queues depends on two aspects
- The read speed of the message queue, and generally the read speed of the message queue itself, is fast enough compared to the speed at which messages are processed
- Processing speed, which depends on the business
The core problem here is reading too many messages into memory without considering business processing speed, which can cause two problems:
- High memory usage, even OOM,
pod
There is amemory limit
的 - stop
pod
When the accumulated messages are not processed enough, resulting in message loss
Solutions and implementations
To borrow a picture from Rob Pike, this is similar to queuing consumption. The four gopher on the left are fetched from the queue, and the four gopher on the right are fetched and processed. The ideal result is that the rates on the left and right are basically the same, no one wastes, no one waits, and there’s no stacking in the middle exchange.
Let’s look at how Go-Zero is implemented:
Producer
端
for {
select {
case <-q.quit:
logx.Info("Quitting producer")
return
default:
if v, ok := q.produceOne(producer); ok {
q.channel <- v
}
}
}
Copy the code
There is no exit event and a message is read through produceOne and written to channel on success. Chan can be used to solve the problem of connection between reading and consumption.
Consumer
端
for {
select {
case message, ok := <-q.channel:
if ok {
q.consumeOne(consumer, message)
} else {
logx.Info("Task channel was closed, quitting consumer...")
return
}
case event := <-eventChan:
consumer.OnEvent(event)
}
}
Copy the code
If ok is false, the channel is closed and you can exit the whole processing loop. We also support pause/ Resume on the Redis Queue, which we use a lot in social scenarios to tell consumers to pause and resume.
- Start the
queue
With these we can controlproducer/consumer
The number of times to achieve throughput tuning
func (q *Queue) Start(a) {
q.startProducers(q.producerCount)
q.startConsumers(q.consumerCount)
q.producerRoutineGroup.Wait()
close(q.channel)
q.consumerRoutineGroup.Wait()
}
Copy the code
The important thing to note here is that the producer should be stopped first and then wait for the consumer to finish processing the producer.
Here the core control code is basically finished, in fact, it looks very simple, you can also go to github.com/tal-tech/go… To see the full implementation.
use
Basic usage process:
- create
producer
或consumer
- Start the
queue
- Production messages/consumption messages
Corresponds to queue, roughly as follows:
Create a queue
// The producer creates the factory
producer := newMockedProducer()
// Consumers create factories
consumer := newMockedConsumer()
// Pass the producer and consumer create factory function to NewQueue()
q := queue.NewQueue(func(a) (Producer, error) {
return producer, nil
}, func(a) (Consumer, error) {
return consumer, nil
})
Copy the code
Let’s see what parameters NewQueue takes:
producer
The factory methodconsumer
The factory method
Pass the Producer & Consumer factory functions to the Queue, which creates them. The framework provides the Producer and Consumer interfaces and factory method definitions, and the control queue implementation of the entire process is automated.
productionmessage
We use a custom mockedProducer to simulate:
type mockedProducer struct {
total int32
count int32
// Use waitGroup to simulate task completion
wait sync.WaitGroup
}
// Produce()
func (p *mockedProducer) Produce(a) (string.bool) {
if atomic.AddInt32(&p.count, 1) <= p.total {
p.wait.Done()
return "item".true
}
time.Sleep(time.Second)
return "".false
}
Copy the code
Producer writing in a queue must implement:
Produce()
: The developer writes the logic that produces the messageAddListener()
: Add eventslistener
consumptionmessage
We simulated this by defining a mockedConsumer:
type mockedConsumer struct {
count int32
}
func (c *mockedConsumer) Consume(string) error {
atomic.AddInt32(&c.count, 1)
return nil
}
Copy the code
Start thequeue
Start and then verify that the data transfer between the producer and consumer we described above is successful:
func main(a) {
/ / create the queue
q := NewQueue(func(a) (Producer, error) {
return newMockedProducer(), nil
}, func(a) (Consumer, error) {
return newMockedConsumer(), nil
})
// Starting panic can also ensure that stop is executed to clean up resources
defer q.Stop()
/ / start
q.Start()
}
Copy the code
This is the simplest implementation of queue. We implemented message queue services based on Redis and Kafka etc. through this core/ Queue framework, which has been fully tested in different business scenarios. You can also implement your own message queue services based on your business realities.
The overall design
The overall process is as shown in the figure above:
- All communications are made up of
channel
for Producer
和Consumer
The number can be set to match different business requirementsProduce
和Consume
The implementation is defined by the developer,queue
Responsible for the overall process
conclusion
This article showed you how to balance the speed of reading and processing messages from a queue through a channel, how to implement a generic message queue processing framework, and used mock examples to briefly show how to implement a message queue processing service based on core/ Queue. You can implement a message queue processing service based on RocketMQ et al in a similar way.
For more articles on the design and implementation of Go-Zero, you can follow the “Microservices Practice” public account.
The project address
Github.com/tal-tech/go…
Welcome to Go-Zero and star support us!
Wechat communication group
Pay attention to the “micro service practice” public account and click into the group to obtain the qr code of the community group.