preface
Delay queues are a very useful tool, and we often come across situations where we need to use them, such as delayed notification, order closure, and so on.
This article focuses on implementing delayed messages using Go+Kafka.
The Sarama client is used.
The principle of
Kafka implements delayed messages in three steps:
- The producer sends the message to
Delays in the queue
- Delayed service handle
Delays in the queue
Write messages that exceed the delay time inReal queue
- Consumer consumption
Real queue
In the news
Simple implementation
producers
The producer simply sends the message to the delay queue
msg := &sarama.ProducerMessage{
Topic: kafka_delay_queue_test.DelayTopic,
Value: sarama.ByteEncoder("test" + strconv.Itoa(i)),
}
if_, _, err := producer.SendMessage(msg); err ! =nil {
log.Println(err)
}
Copy the code
Delay service
The delay service subscribes to messages from the delay queue and sends timeout messages to the real queue
if err = consumerGroup.Consume(context.Background(),
[]string{kafka_delay_queue_test.DelayTopic}, consumer); err ! =nil {
break
}
Copy the code
type Consumer struct {
producer sarama.SyncProducer
delay time.Duration
}
func NewConsumer(producer sarama.SyncProducer, delay time.Duration) *Consumer {
return &Consumer{
producer: producer,
delay: delay,
}
}
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
// If the message has timed out, it is sent to the real queue
now := time.Now()
if now.Sub(message.Timestamp) >= c.delay {
_, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
Topic: kafka_delay_queue_test.RealTopic,
Key: sarama.ByteEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
})
if err == nil {
session.MarkMessage(message, "")}continue
}
// Otherwise sleep for one second
time.Sleep(time.Second)
return nil
}
return nil
}
Copy the code
consumers
Consumers simply subscribe to the real queue and consume messages
if err = consumerGroup.Consume(context.Background(),
[]string{kafka_delay_queue_test.RealTopic}, consumer); err ! =nil {
break
}
Copy the code
type Consumer struct{}
func NewConsumer(a) *Consumer {
return &Consumer{}
}
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Println("Received a message:", message.Value, message.Timestamp)
session.MarkMessage(message, "")}return nil
}
Copy the code
Improved point
Universal delay service
The delayed service can be encapsulated as a generic service so that the producer can send messages directly to the delayed service and let the delayed service handle the rest of the logic.
The delay service provides multiple delay levels, such as 5s, 10S, 30s, 1M, 5m, 10m, 1H, and 2h, similar to RocketMQ.
The producer is responsible for delaying the service
It is also possible to put the producer in charge of the delay service, allowing the producer to send messages from the delay queue to the real queue.
Here is a simple implementation:
// KafkaDelayQueueProducer Specifies the delay queue producer, which includes producers and delay services
type KafkaDelayQueueProducer struct {
producer sarama.SyncProducer / / producer
delayTopic string // Delay service topic
}
/ / NewKafkaDelayQueueProducer create queue delay producers
// producer
/ / delayServiceConsumerGroup delay service consumers
// delayTime delayTime
// delayTopic delay service topic
// realTopic Real queue topic
func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,
delayTime time.Duration, delayTopic, realTopic string) *KafkaDelayQueueProducer {
// Start the delay service
consumer := NewDelayServiceConsumer(producer, delayTime, realTopic)
go func(a) {
for {
if err := delayServiceConsumerGroup.Consume(context.Background(),
[]string{delayTopic}, consumer); err ! =nil {
break}}} ()return &KafkaDelayQueueProducer{
producer: producer,
delayTopic: delayTopic,
}
}
// SendMessage Sends a message
func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
msg.Topic = q.delayTopic
return q.producer.SendMessage(msg)
}
// DelayServiceConsumer DelayServiceConsumer
type DelayServiceConsumer struct {
producer sarama.SyncProducer
delay time.Duration
realTopic string
}
func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,
realTopic string) *DelayServiceConsumer {
return &DelayServiceConsumer{
producer: producer,
delay: delay,
realTopic: realTopic,
}
}
func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
// If the message has timed out, it is sent to the real queue
now := time.Now()
if now.Sub(message.Timestamp) >= c.delay {
_, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
Topic: c.realTopic,
Key: sarama.ByteEncoder(message.Key),
Value: sarama.ByteEncoder(message.Value),
})
if err == nil {
session.MarkMessage(message, "")}continue
}
// Otherwise sleep for one second
time.Sleep(time.Second)
return nil
}
return nil
}
func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
Copy the code
conclusion
It is easy to implement delayed messages in Kafka using intermediate queues and polling. If a common delay queue is required, a common delay service can be implemented, and the consumer can be responsible for the delayed service function.
Complete code:
- Simple implementation example: github.com/jiaxwu/dq/t…
- Producers that include delayed services: github.com/jiaxwu/dq/b…