This is the second day of my participation in the August Text Challenge.More challenges in August

The background,

In some business systems, Kafka decoupling is introduced between modules, using IM as an example:

User A sends A message to user B. After receiving the message, msg_gateway delivers the message to Kafka and sends A A success message. At this point, it’s not actually persisted in mysql, although it will eventually be consistent. So, if Kafka loses messages, is there a big problem? User A thinks the message sent to user B is successful, but the message is lost inside the server and is not received by user B.

As a result, some businesses are very concerned about message loss when using Kafka.

Also, common questions are:

  • The problem of repeated consumption.
  • Out of order problem.

Let’s take a look at using the Sarama package to solve these problems.

Kafka message loss problem

When does Kafka lose messages

The point of concern above requires further clarification of the definition of lost messages: a failure of some or all brokers in a Kafka cluster, resulting in a consumer not receiving a message in a timely manner, is not a lost message. The broker hangs. As long as the messages are all persisted to hard disk, restart the broker cluster and allow consumers to continue pulling messages, the messages are not lost and are still consumed in full. So by lost messages, AS I understand it, it means that the developer is not aware of which messages are not consumed.

The authors summarize the loss of messages as follows:

  1. Producer sends a message to the broker, but the message does not reach the broker because of network jitter and the developer is unaware of it.

Solution: Producer sets the acks parameter and returns an ACK signal after the message is synchronized to the master. Otherwise, an exception is thrown so that applications can sense the message and retry sending. This method ensures the reliability of messages to a certain extent, and the delay of producer waiting for the broker to confirm the signal is not high.

  1. The producer sends the message to the broker-master. The master receives the message and hangs before it synchronizes the message to the follower, without the developer being aware.

Solution: Producer sets acks parameters. After the messages are synchronized to the master and to all followers, an ACK signal is returned. Otherwise, the application detects that the messages are thrown and sends them again. This setting ensures the reliability of messages to a greater extent, but the disadvantage is that the delay of producer waiting for the broker to confirm the signal is relatively high.

  1. The producer sends the message to the broker-master. The master receives the message, but the master fails to synchronize the message to each follower, causing the risk of message loss.

Solution: Ditto.

4) A broker message hangs before it has been persisted to disk from the memory buffer, which cannot be sensed by the ACK mechanism.

Solution: Setting parameters to speed up the frequency of message persistence can reduce the probability of this to some extent. But increasing frequency naturally affects performance as well.

  1. Consumer successfully pulled the message. Consumer is dead.

Solution: Set manual sync and commit only after successful consumption.

To sum up, Kafka does not lose messages when the cluster/project is running properly. In the event of a cluster failure, the reliability of the message cannot be fully guaranteed. In order to ensure that the message is as reliable as possible, the only solution is to re-send the message when it is found that there is a possibility that the message has not been consumed. Therefore, in business logic, repeated consumption of messages should be considered, and idempotent mechanisms should be in place for key links.

Several suggestions from the authors:

  1. If a business is critical, consider the cost of losing messages and the solution when using Kafka.
  2. The producer end checks whether the message reaches the cluster. If any exception occurs, the producer resends the message.
  3. The consumer side guarantees idempotency of consumption.
  4. Operation and maintenance Ensure that the cluster runs normally and is highly available, and the network is in good condition.

3. Message loss at the production end is resolved

All you need to do is set producer to acks and wait for all followers to succeed in Kafka before returning. All we need to do is set the following:

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // -1
Copy the code

The ACK parameters have the following values:

const (
    // NoResponse doesn't send any response, the TCP ACK is all you get.
    NoResponse RequiredAcks = 0
    // WaitForLocal waits for only the local commit to succeed before responding.
    WaitForLocal RequiredAcks = 1
    // WaitForAll waits for all in-sync replicas to commit before responding.
    // The minimum number of in-sync replicas is configured on the broker via
    // the `min.insync.replicas` configuration key.
    WaitForAll RequiredAcks = - 1
)
Copy the code

4. Message loss on the consumer end

Usually, the consumer loses the message because the Offset is automatically submitted, but the data is not inserted into mysql (for example, there is a BUG or process Crash). As a result, after the consumer restarts the next time, the message is missed and cannot be found in the natural database. At this point, we can solve the problem by manual commit, or even use two-phase commit in some complex scenarios.

Message loss problem in automatic commit mode

By default, sarAMA is an automatic submission with 1 second interval.

// NewConfig returns a new configuration instance with sane defaults.
func NewConfig(a) *Config {
   / /...
   c.Consumer.Offsets.AutoCommit.Enable = true. // Automatic submission
   c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second / / interval
   c.Consumer.Offsets.Initial = OffsetNewest
   c.Consumer.Offsets.Retry.Max = 3
   // ...
}
Copy the code

The auto-commit here is based on the marked message (sess.markMessage (MSG, “”)):

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
      // Mark that the message has been processed and sarAMA will commit it automatically
      sess.MarkMessage(msg, "")}return nil
}
Copy the code

If sess.markMessage (MSG, “”) is not called, even if auto commit is enabled, the next startup consumer will consume from the previous Offset, we can comment out sess.markMessage (MSG,” “). Then open Offset Explorer to view:

So, we understand the principle of sarama autocommit: mark first and commit later. We just need to keep the tag logic after inserting mysql code to ensure that we don’t lose messages.

Correct call order:

func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      / / insert the mysql
      insertToMysql(msg)
 
      // Correct: after mysql is inserted successfully, the program will crash, and the next consumption will be repeated at most onceSess. MarkMessage (MSG,"") } return nil }Copy the code

Wrong order:

func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      // Error 1: the message will not be consumed next time, so it will be lost
      // Error 2: Simply forget to call sess.markMessage (MSG, ""), resulting in repeated consumptionSess. MarkMessage (MSG,"") // insert mysql insertToMysql(MSG)} return nil}Copy the code

Sarama manual commit mode

Of course, it is also possible to handle message loss through manual submission, but I do not recommend it, because the message loss problem can be solved in automatic submission mode.

consumerConfig := sarama.NewConfig()
consumerConfig.Version = sarama.V2_8_0_0
consumerConfig.Consumer.Return.Errors = false
consumerConfig.Consumer.Offsets.AutoCommit.Enable = false  // Disable automatic submission and change to manual
consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
 
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
 
      / / insert the mysql
      insertToMysql(msg)
 
      // In manual submission mode, you also need to mark it first
      sess.MarkMessage(msg, "")
 
      consumerCount++
      if consumerCount%3= =0 {
         // Manual submission, cannot be called frequently, takes about 9ms, macOS I7 16GB
         t1 := time.Now().Nanosecond()
         sess.Commit()
         t2 := time.Now().Nanosecond()
         fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")}}return nil
}
Copy the code

5. Kafka message order problem

Before Posting Kafka, we solved the problem of message sequence generation with a gRPC call, but there is also a message ordering problem: how can consumers subscribing to Kafka write to mysql in order of messages, rather than randomly?

We know that Kafka’s messages are ordered within a partition, so just make sure that messages sent to someone are in the same partition.

In Kafka, one partition corresponds to one thread, so kafka throughput is a problem in this model.

2. Specify multiple partitions manually

MSG := &sarama.producerMessage {Topic: "msgc2s"", Value: sarama.stringencoder (" hello "), Partition: toUserId % 10,} Partition, offset, err := producer.SendMessage(MSG)Copy the code

When producing messages, except for Topic and Value, we can manually specify partitions. For example, there are 10 partitions in total, and we can mod them according to the user ID. In this way, messages sent to the same user are sent to one partition every time.

However, since the total number of partitions is written dead, what if Kafka’s partition number needs to be adjusted? Can’t you recompile the code? So it’s not pretty.

3. Multiple partitions are automatically calculated

The Kafka client provides this support for us. First, set the partition selection policy to Hash during initialization

p.config.Producer.Partitioner = sarama.NewHashPartitioner
Copy the code

Then, before generating the message, set the Key value of the message:

msg := &sarama.ProducerMessage{
   Topic: "testAutoSyncOffset",
   Value: sarama.StringEncoder("hello"),
   Key: sarama.StringEncoder(strconv.Itoa(RecvID)),
}
Copy the code

The Kafka client hashes based on the Key, and by using the receiving user ID as the Key, we can make all messages sent to someone fall into the same partition, which is sorted.

4. Expand knowledge: multithreading a partition out of order processing

As mentioned above, the Kafka client initiates a single thread to consume a partition. If the processing is time-consuming, such as tens of ms per message, then only tens of messages per second can be processed. This throughput is too low. At this point, we may move the logic to another thread, so that the order may be out of order.

We can write N memory queues, all data with the same key to the same memory queue; Then, for N threads, each thread consumes a queue to ensure orderliness. % 10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4%10 = 4

Repeat consumption and message idempotent

RequiredAcks = sarama.waitForall (-1) is used to solve the problem of dropping messages on the production side. In Section 6, we also explained the loss of messages on the consumer side. Just be sure to call sess.markMessage (MSG, “”) after inserting the database.

If Mysql is inserted successfully, but there is a 1-second interval for automatic submission, if it crashes, the next startup consumer is bound to re-consume the 1-second data, we need to deal with this problem in the application layer.

There are two common ideas:

  1. If data exists in Redis that does not need to be persisted, such as string, set is inherently idempotent and does not need to be processed.
  2. Before inserting mysql, we perform a query operation. For each message sent by the client, we either generate a unique ID (such as GUID) for it, or set the ID of the message directly to a unique index.

The difficulty of the second scheme is that the generation of global unique ID, theoretically there is the possibility of duplication of GUID, if it is generated by the client, then insert failure, how to let the client sense? So, here I would argue that custom ID production is still needed, such as through group method: user ID + current time + 32-bit GUID, is it almost impossible to repeat (just imagine how many years it would take one person to send 100 million text messages…) ?

7. Complete code examples

consumer.go

type msgConsumerGroup struct{}
 
func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
 
      // check the mysql database
      if check(msg) {
          / / insert the mysql
          insertToMysql()
      }
 
      // flag, sarama will commit automatically, default interval 1 second
      sess.MarkMessage(msg, "")}return nil
}
 
func main(a){
    consumerConfig := sarama.NewConfig()
    consumerConfig.Version = sarama.V2_8_0_0 // specify appropriate version
    consumerConfig.Consumer.Return.Errors = false
    / / consumerConfig. Consumer. Offsets. The AutoCommit mode. The Enable = true / / disable AutoCommit, instead of manually
    / / consumerConfig. Consumer. Offsets. The AutoCommit mode. The Interval = time. The Second AutoCommit * 1 / / test 3 seconds
    consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
 
    cGroup, err := sarama.NewConsumerGroup([]string{"10.0.56.153:9092"."10.0.56.153:9093"."10.0.56.153:9094"},"testgroup", consumerConfig)
    iferr ! =nil {
       panic(err)
    }
 
   for {
      err := cGroup.Consume(context.Background(), []string{"testAutoSyncOffset"}, consumerGroup)
      iferr ! =nil {
         fmt.Println(err.Error())
         break
      }
   }
 
   _ = cGroup.Close()
}
Copy the code

producer.go

func main(a){
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all followers to reply to ack to ensure that Kafka does not lose messages
    config.Producer.Return.Successes = true
    config.Producer.Partitioner = sarama.NewHashPartitioner // Hash the Key, the same Key falls into a partition each time, so that the message is ordered
 
    // Using synchronous producer provides higher performance but more complex processing. It is recommended to start with the simple mode first
    producer, err := sarama.NewSyncProducer([]string{"10.0.56.153:9092"}, config)
    defer func(a) {
       _ = producer.Close()
    }()
    iferr ! =nil {
       panic(err.Error())
    }
 
    msgCount := 4
    // Simulate four messages
    for i := 0; i < msgCount; i++ {
        rand.Seed(int64(time.Now().Nanosecond()))
        msg := &sarama.ProducerMessage{
          Topic: "testAutoSyncOffset",
          Value: sarama.StringEncoder("hello+" + strconv.Itoa(rand.Int())),
          Key:   sarama.StringEncoder(} t1 := time.now ().nanosecond () partition, offset, err := producer.SendMessage(msg) t2 := time.Now().Nanosecond() if err == nil { fmt.Println("produce success, partition:", partition, ",offset:", offset, ",cost:", (t2-t1)/(1000*1000), " ms") } else { fmt.Println(err.Error()) } } }Copy the code

Eight, reference

  • Kafka zhuanlan.zhihu.com/p/54287819 data loss and repeated consumption
  • When will Kafka lose messages
  • The meaning of the CAP theorem www.ruanyifeng.com/blog/2018/0…
  • Kafka introduction (3) : how Sarama producers work www.cnblogs.com/hongjijun/p…
  • Super handy Kafka client management tool Offset Explorer www.ibloger.net/article/349…
  • Check the cluster kafka’s Version (Version) blog.csdn.net/Damonhaus/a…
  • Kafka how to ensure the order of the messages blog.csdn.net/qianshangdi…