The problem started with a Kafka outage. I work for a fintech company that uses Kafka, which was originally designed for log processing, rather than RabbitMQ, which is more popular in financial payments, so I’ve been curious about Kafka’s highly available implementation and security. Kafka for internal use has been running steadily since Kafka was deployed and has not become unusable.
Recently, however, system testers have reported occasional instances where Kafka consumers do not receive messages and log on to the admin screen to find that one of the three nodes is down and down. But with the concept of high availability, how can three nodes with two nodes available cause the entire cluster of consumers to not receive messages? To solve this problem, start with Kafka’s highly available implementation.
Whether the traditional relational database based design system, or distributed such as ZooKeeper, Redis, Kafka, HDFS and so on, the way to achieve high availability is usually the use of redundancy design, through redundancy to solve the problem of node downtime unavailable. Let’s start with a brief overview of Kafka’s concepts: the physical model
The logical model
Broker: A Kafka service node. Simply put, a Broker is a Kafka server, a physical node.
Topic: In Kafka, messages are grouped by Topic. Each Topic has a Topic Name. Producers send messages to specific topics based on the Topic Name, and consumers consume messages from the corresponding topics based on the Topic Name.
Partition: Topic is a unit of message categorization, but each Topic can be subdivided into one or more partitions. A Partition can belong to only one Topic. Topics and partitions are logical concepts. For example, message 1 and message 2 are both sent to topic 1. They may go to the same partition or to different partitions (so different partitions under the same topic contain different messages), and then are sent to the corresponding Broker node of the partition.
Offset (Offset) : partition can be thought of as one that not only into the queue (Kafka only guarantee within a partition messages is ordered), news will to append to the end of the queue, each message after entering partition can have an Offset, identify the message in the position of the partition, the consumer to consume the message is identified by the Offset.
Kafka’s multiple copy redundancy design is implemented based on the above concepts. Don’t worry. Let’s keep reading. Prior to Kafka 0.8, there was no multi-copy redundancy. Once a node failed, data from all partitions on that node could no longer be consumed. This means that some of the data sent to the Topic is lost.
The introduction of replica journalists after version 0.8 solves the problem of data loss after downtime. Replicas are the data of each Partition in a Topic. The data of each Partition is synchronized to other physical nodes to form multiple replicas. Each copy of a Partition contains one Leader copy and multiple Follower copies. The Leader copy is elected by all the copies, and the other copies are all Follower copies. When the producer or consumer writes data, the Follower only interacts with the Leader. After writing data, the Follower pulls data for data synchronization.
Is it that simple? Yes, Kafka is highly available based on the multi-copy architecture diagram above. When a Broker fails, do not worry that partitions on this Broker have copies on other Broker nodes. What do you think happens if the Leader fails? In that case, a Leader can be elected among followers, and producers and consumers can play happily with the new Leader, which is high availability.
You might wonder, how many copies is enough? What if there is not complete synchronization between the followers and the Leader? What are the election rules for the Leader when a node goes down?
Immediate conclusion: How many copies is enough? The more copies you have, the more highly available Kafka will be. However, the more copies you have, the more network and disk resources you consume, and the worse the performance will be. In general, the more copies you have, the more available Kafka will be. What if there is not complete synchronization between followers and Leads?
Follower and Leader are not completely synchronized, but not completely asynchronous either. Instead, an ISR mechanism (in-sync Replica) is used. Each Leader dynamically maintains an ISR list of followers that are basically synchronized with the Leader. If a Follower does not send a data pull request to the Leader due to network or GC, the Follower is out of sync with the Leader and will be removed from the ISR list. Therefore, the followers in the ISR list are all copies that can keep up with the Leader.
What are the election rules for the Leader when a node goes down? There are many distributed election rules like ZooKeeper’s Zab, Raft, Viewstamped Replication, Microsoft’s PacificA, and more. In Kafka, the Leader election is very simple. Based on the ISR list we mentioned above, when a downtime occurs, all replicas are searched in sequence. If the replicas found are in the ISR list, the Leader is elected. In addition, it is necessary to ensure that the previous Leader has already abdicated, otherwise there will be a split brain (there are two leaders). How? Kafka ensures that there is only one Leader by setting up a Controller.
The Ack parameter determines the reliability of Kafka high availability and asks for tasks.
Asks is an important configuration for the producer client and can be set when sending messages. This parameter has three values: 0, 1, and All.
The first one is set to 0, which means that after the producer sends the message, we don’t care whether the message is dead or alive. It’s a little bit forgotten after sending the message, so we’re not responsible for the message. Without accountability, the message could be lost, and the availability could be lost.
The second is set to 1, which means that after the producer sends the message, it does not matter whether other followers synchronize as long as the message is successfully transmitted to the Leader. There is a case where the Leader receives a message and the Follower crashes before the Broker can be synchronized, but the producer thinks the message has been sent and the message is lost. Note that this is the default configuration for Kafka. Kafka’s default configuration is not high availability, but a tradeoff between high availability and high throughput.
The third option is set to All (or -1), which means that after the producer sends the message, not only the Leader must receive it, but also the followers in the ISR list must be synchronized so that the producer can send the task message successfully. Asks=All does not appear to lose messages? The answer is no. When only Leader is left in the ISR list, Asks=All equals to Asks=1. In this case, if the node goes down, can the data not be lost? Therefore, data loss can only be guaranteed when Asks=All and there are two copies in ISR.
We have come full circle to understand Kafka’s high availability mechanism, and finally come back to our original question: why is Kafka unavailable when a node is down? In the development test environment, THE number of Broker nodes I configured is 3, the number of Topic copies is 3, the number of partitions is 6, and Asks parameter is 1.
What does the cluster do first when one of the three nodes goes down? Yes, as we mentioned above, the cluster finds that the Leader with Partition has failed, and the Leader needs to be reelected from the ISR list. Is it unavailable if the ISR list is empty? No, one of the surviving copies of the Partition is selected as the Leader, but this has the potential of data loss.
So, as long as the number of Topic replicas is set to the same as the number of brokers, Kafka’s multi-replica redundancy design is highly available and does not become unusable once a downtime occurs (note that Kafka has a protection policy that Kafka stops when more than half of its nodes become unavailable). If you think about it, Kafka has a Topic that has 1 replica. The problem is with consumer_offset, a Topic Kafka automatically creates to store information about consumer offsets (partitions by default are 50). And that’s this Topic, whose default number of copies is 1. If all partitions exist on the same machine, this is an obvious single point of failure! When the Broker storing the Partition __consumer_offset is killed, all consumers stop consuming.
How to solve this problem? I need to delete __consumer_offset from Kafka. Note that this Topic is a built-in Topic in Kafka. I cannot delete this Topic by deleting logs.
Need by setting the offsets. The topic. The replication. The factor of 3 to __consumer_offset replicas instead of 3.
By making __consumer_offset copies redundant, the problem of consumer consumption after a node goes down is solved.
Finally, why does __consumer_offset Partition appear to be stored on only one Broker instead of across all brokers