How does Go manipulate Kafka to guarantee no message loss

background

At present, some Internet companies use message queue to do core business, because it is the core business, so it is sensitive to the final consistency of data, if there is data loss in the middle, it will attract complaints from users, and the year-end performance will become 325. I was talking to a couple of friends. Their companies are using Kafka for message queues. Is it possible to lose messages using Kafka? How do you compensate for lost messages? In this article, we will take a look at Kafka and show you how to use the Go operation without losing data.

This article operates kafka based on: github.com/Shopify/sar…

I metkafkaarchitecture

Kafka from Wikipedia:

Kafka is an open source stream processing platform developed by the Apache Software Foundation and written in Scala and Java. The goal of the project is to provide a unified, high-throughput, low-latency platform for processing real-time data. Its persistence layer is essentially a “massive publish/subscribe message queue based on a distributed transaction logging architecture,” which makes it valuable as an enterprise-level infrastructure for handling streaming data. In addition, Kafka can be connected to external systems (for data input/output) through Kafka Connect, and provides Kafka Streams, a Java] stream processing library. This design is heavily influenced by transaction logging.

Kafka has a simple overall architecture, consisting of producer, broker and consumer:

For the architecture diagram we explain a single module:

  • Producer: A producer of data that can be published to the selectedtopicIn the.
  • Consumer: Consumers of data, useConsumer GroupTo identify, intopicEach record in is assigned to a consumer instance in the subscription consumer group, which can be distributed across multiple processes or machines.
  • Broker: Message-oriented middleware processes nodes (servers). A node is a Broker. A Kafka cluster consists of one or more brokers.

There are also some concepts that we will introduce:

  • Topic: A topic can have multiple partitions. A topic can have multiple producers pushing messages. A topic can have multiple consumers pulling messages to it. A topic can exist in one or more brokers.
  • partition: it is a subset of topics. Different partitions are allocated to expand horizontally on different brokers to increase the parallel processing capability of Kafka. Information of different partitions under the same topic is different, and information of the same partition is ordered. Each partition has one or more copies, one of which is electedleader.fowllerfromleaderPull data updates its own log (each partition logically corresponds to a log folder), and the consumer pulls information to the leader.

Kafka three nodes that drop messages

Producer push message node

Let’s take a look at the general writing flow of producer:

  • Producer first finds the partition leader from the Kafka cluster
  • Producer sends the message to the leader, who writes the message locally
  • Follwers pulls messages from the leader, writes them to the local log, and the leader sends an ACK
  • After receiving the REPLICas’ ACKS in all ISRS, the leader adds a high watermark and sends an ACK to the producer

As you can see from this process, Kafka eventually returns an ACK to confirm the result of the push message. Kafka provides three modes:

NoResponse RequiredAcks = 0
WaitForLocal RequiredAcks = 1
WaitForAll RequiredAcks = - 1
Copy the code
  • NoResponse RequiredAcks = 0: This means that the success of the data launch is irrelevant to me
  • WaitForLocal RequiredAcks = 1: After the local(leader) confirms that the host receives the packet successfully, the host returns the packet
  • WaitForAll RequiredAcks = -1: A message is returned only when all the leaders and followers receive messages successfully

Therefore, according to these three modes, we can infer that there is a certain probability that the producer will lose the push message. The analysis is as follows:

  • If we choose the mode1This mode has a high probability of data loss and cannot be retried
  • If we choose the mode2In this mode, as long as the leader does not hang up, data loss can be guaranteed. However, if the leader hangs up and the followers have not synchronized the data, there will be a certain probability of data loss
  • If the mode is selected3, this situation will not result in data loss, but may lead to data duplication. If a network problem occurs when the leader and follower synchronize data, data duplication may occur.

Therefore, in the production environment, we can choose mode 2 or mode 3 to ensure the reliability of the message. The specific choice needs to be made according to the business scenario. Mode 2 is the one that cares about the throughput, and mode 3 is the one that cares about the throughput.

The Kafka cluster is faulty

Kafka cluster receives the data after the data in persistent storage, and the final data will be written to the disk, the disk of this step is also likely to cause data loss, because when written to disk operating system will be to write data into the cache, the operating system will cache data to disk and the time is uncertain, so in this case, If the Kafka machine suddenly goes down, data loss can occur, but this is very rare. Kafka machines are usually backed up in-house, and this is an extreme situation that can be ignored.

The consumer pull message node

Push messages will append data to the Partition and assign an offset, which represents the location of the current consumer consumption. The sequence of messages can also be guaranteed through this Partition. After the consumer pulls a message, it can set automatic commit or manual commit. If the commit succeeds, offset will be offset:

Therefore, automatic lifting brings data loss and manual lifting brings data repetition. The analysis is as follows:

  • When setting up auto-commit, when we pull a message, the offset is already committed, but we fail to process the consumption logic, which results in data loss
  • When setting up a manual commit, if we commit after processing the message, a failure at the commit step would result in double consumption.

Repeated consumption is expected by the business rather than data loss, and can be circumvented by idempotent design.

In actual combat

The full code has been uploaded to github: github.com/asong2020/G…

Solve the push message loss problem

It is mainly solved through two points:

  • By setting theRequiredAcksMode to solve, chooseWaitForAllIt can ensure the success of data push, but it will affect the time delay
  • Introduce the retry mechanism and set the retry times and retry interval

So we write the following code (out of the client creation section) :

func NewAsyncProducer(a) sarama.AsyncProducer {
	cfg := sarama.NewConfig()
	version, err := sarama.ParseKafkaVersion(VERSION)
	iferr ! =nil{
		log.Fatal("NewAsyncProducer Parse kafka version failed", err.Error())
		return nil
	}
	cfg.Version = version
	cfg.Producer.RequiredAcks = sarama.WaitForAll // There are three modes to choose from
	cfg.Producer.Partitioner = sarama.NewHashPartitioner
	cfg.Producer.Return.Successes = true
	cfg.Producer.Return.Errors = true
	cfg.Producer.Retry.Max = 3 // Set three retries
	cfg.Producer.Retry.Backoff = 100 * time.Millisecond
	cli, err := sarama.NewAsyncProducer([]string{ADDR}, cfg)
	iferr ! =nil{
		log.Fatal("NewAsyncProducer failed", err.Error())
		return nil
	}
	return cli
}
Copy the code

Resolve the pull message loss problem

This is a rough solution: directly use the automatic submission mode, and manually submit the offset after each real consumption. However, the problem of repeated consumption will occur, but it is easy to solve, using idempotent operation can be solved.

Code examples:

func NewConsumerGroup(group string) sarama.ConsumerGroup {
	cfg := sarama.NewConfig()
	version, err := sarama.ParseKafkaVersion(VERSION)
	iferr ! =nil{
		log.Fatal("NewConsumerGroup Parse kafka version failed", err.Error())
		return nil} cfg.Version = version cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange cfg.Consumer.Offsets.Initial  = sarama.OffsetOldest cfg.Consumer.Offsets.Retry.Max =3
	cfg.Consumer.Offsets.AutoCommit.Enable = true // Enable auto-commit. You need to manually call MarkMessage to enable auto-commit
	cfg.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second / / interval
	client, err := sarama.NewConsumerGroup([]string{ADDR}, group, cfg)
	iferr ! =nil {
		log.Fatal("NewConsumerGroup failed", err.Error())
	}
	return client
}
Copy the code

The above section is about creating the ConsumerGroup. Careful readers should have noticed that we are using automatic submission, but what about manual submission? This is because of the unique nature of our Kafka library. The auto-commit must be used in conjunction with the MarkMessage() method to commit. Otherwise, the commit will fail because we write the consumption logic like this:

func (e EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		var data common.KafkaMsg
		iferr := json.Unmarshal(msg.Value, &data); err ! =nil {
			return errors.New("failed to unmarshal message err is " + err.Error())
		}
		// Print data instead
		log.Print("consumerClaim data is ")

		// A successful processing message is marked as processed and then automatically committed
		session.MarkMessage(msg,"")}return nil
}
Copy the code

Or use the manual submission method to solve the problem in two steps:

Step 1: Turn off autocommit:

consumerConfig.Consumer.Offsets.AutoCommit.Enable = false  // Disable automatic submission and change to manual
Copy the code

Step 2: Add the following code to the consumption logic. In manual commit mode, also need to mark before committing

session.MarkMessage(msg,"")
session.Commit()
Copy the code

The full code can be downloaded and verified on Github!

conclusion

This article mainly illustrates two knowledge points:

  • Kafka generates message loss
  • How to configure Kafka to not lose data using Go operation

In daily business development, many companies like to decouple message queues, so you should be aware that using Kafka as a message queue does not guarantee data loss, we need to manually configure compensation, do not forget, or another P0 accident.

Welcome to the public account: Golang Dream Factory

Recommended previous articles:

  • Learning channel design: From getting started to giving up
  • Detail memory alignment
  • Do not abuse Goroutine
  • Source analysis panic and recover, do not understand you hit me!
  • Interviewer: Matsuko is here to talk about memory escape
  • Interviewer: What is the result of two nil comparisons?
  • Errgroup for concurrent programming packages

Refer to the article

  • Juejin. Cn/post / 684490…
  • Cloud.tencent.com/developer/a…
  • Juejin. Cn/post / 699926…