preface

Delay queues are a very useful tool, and we often come across situations where we need to use them, such as delayed notification, order closure, and so on.

This article focuses on implementing delayed messages using Go+Kafka.

The Sarama client is used.

The principle of

Kafka implements delayed messages in three steps:

  1. The producer sends the message toDelays in the queue
  2. Delayed service handleDelays in the queueWrite messages that exceed the delay time inReal queue
  3. Consumer consumptionReal queueIn the news

Simple implementation

producers

The producer simply sends the message to the delay queue

msg := &sarama.ProducerMessage{
   Topic: kafka_delay_queue_test.DelayTopic,
   Value: sarama.ByteEncoder("test" + strconv.Itoa(i)),
}
if_, _, err := producer.SendMessage(msg); err ! =nil {
   log.Println(err)
}
Copy the code

Delay service

The delay service subscribes to messages from the delay queue and sends timeout messages to the real queue

if err = consumerGroup.Consume(context.Background(),
   []string{kafka_delay_queue_test.DelayTopic}, consumer); err ! =nil {
   break
}
Copy the code
type Consumer struct {
   producer sarama.SyncProducer
   delay    time.Duration
}

func NewConsumer(producer sarama.SyncProducer, delay time.Duration) *Consumer {
   return &Consumer{
      producer: producer,
      delay:    delay,
   }
}

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for message := range claim.Messages() {
      // If the message has timed out, it is sent to the real queue
      now := time.Now()
      if now.Sub(message.Timestamp) >= c.delay {
         _, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
            Topic: kafka_delay_queue_test.RealTopic,
            Key:   sarama.ByteEncoder(message.Key),
            Value: sarama.ByteEncoder(message.Value),
         })
         if err == nil {
            session.MarkMessage(message, "")}continue
      }
      // Otherwise sleep for one second
      time.Sleep(time.Second)
      return nil
   }
   return nil
}
Copy the code

consumers

Consumers simply subscribe to the real queue and consume messages

if err = consumerGroup.Consume(context.Background(), 
   []string{kafka_delay_queue_test.RealTopic}, consumer); err ! =nil {
   break
}
Copy the code
type Consumer struct{}

func NewConsumer(a) *Consumer {
   return &Consumer{}
}

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for message := range claim.Messages() {
      fmt.Println("Received a message:", message.Value, message.Timestamp)
      session.MarkMessage(message, "")}return nil
}
Copy the code

Improved point

Universal delay service

The delayed service can be encapsulated as a generic service so that the producer can send messages directly to the delayed service and let the delayed service handle the rest of the logic.

The delay service provides multiple delay levels, such as 5s, 10S, 30s, 1M, 5m, 10m, 1H, and 2h, similar to RocketMQ.

The producer is responsible for delaying the service

It is also possible to put the producer in charge of the delay service, allowing the producer to send messages from the delay queue to the real queue.

Here is a simple implementation:

// KafkaDelayQueueProducer Specifies the delay queue producer, which includes producers and delay services
type KafkaDelayQueueProducer struct {
   producer   sarama.SyncProducer / / producer
   delayTopic string              // Delay service topic
}

/ / NewKafkaDelayQueueProducer create queue delay producers
// producer
/ / delayServiceConsumerGroup delay service consumers
// delayTime delayTime
// delayTopic delay service topic
// realTopic Real queue topic
func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,
   delayTime time.Duration, delayTopic, realTopic string) *KafkaDelayQueueProducer {
   // Start the delay service
   consumer := NewDelayServiceConsumer(producer, delayTime, realTopic)
   go func(a) {
      for {
         if err := delayServiceConsumerGroup.Consume(context.Background(),
            []string{delayTopic}, consumer); err ! =nil {
            break}}} ()return &KafkaDelayQueueProducer{
      producer:   producer,
      delayTopic: delayTopic,
   }
}

// SendMessage Sends a message
func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
   msg.Topic = q.delayTopic
   return q.producer.SendMessage(msg)
}

// DelayServiceConsumer DelayServiceConsumer
type DelayServiceConsumer struct {
   producer  sarama.SyncProducer
   delay     time.Duration
   realTopic string
}

func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,
   realTopic string) *DelayServiceConsumer {
   return &DelayServiceConsumer{
      producer:  producer,
      delay:     delay,
      realTopic: realTopic,
   }
}

func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for message := range claim.Messages() {
      // If the message has timed out, it is sent to the real queue
      now := time.Now()
      if now.Sub(message.Timestamp) >= c.delay {
         _, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
            Topic: c.realTopic,
            Key:   sarama.ByteEncoder(message.Key),
            Value: sarama.ByteEncoder(message.Value),
         })
         if err == nil {
            session.MarkMessage(message, "")}continue
      }
      // Otherwise sleep for one second
      time.Sleep(time.Second)
      return nil
   }
   return nil
}

func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error {
   return nil
}

func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error {
   return nil
}
Copy the code

conclusion

It is easy to implement delayed messages in Kafka using intermediate queues and polling. If a common delay queue is required, a common delay service can be implemented, and the consumer can be responsible for the delayed service function.

Complete code:

  • Simple implementation example: github.com/jiaxwu/dq/t…
  • Producers that include delayed services: github.com/jiaxwu/dq/b…