7. Consumer group
The Consumer Group is an extensible and fault tolerant Consumer mechanism provided by Kafka
- A Consumer Group can have one or more Consumer instances. The instance here can be a single process or a thread under the same process. In a real-world scenario, using processes is more common.
- The Group ID is a string that identifies a unique Consumer Group in a Kafka cluster.
- A single partition of topics to which all instances of a Consumer Group subscribe can be allocated for consumption only by one Consumer instance within the Group. This partition can also be consumed by other groups.
The disadvantage of the traditional message queue model is that once a message is consumed, it is removed from the queue and can only be consumed by a single Consumer downstream. Strictly speaking, this is not a bug, but a feature. This model obviously scales poorly, however, because downstream consumers compete for messages on the shared message queue. The publish/subscribe model does allow messages to be consumed by multiple consumers, but its problem is that it does not scale very well because each subscriber must subscribe to all partitions of the topic. This full subscription approach is not flexible and will affect the actual delivery of messages.
It would be nice if there were a mechanism that could circumvent the flaws of both models and combine their strengths. Fortunately, Kafka’s Consumer Group is one such mechanism. When a Consumer Group subscribes to multiple topics, each instance of the Group is not required to subscribe to all partitions of the topic; it only consumes messages from some partitions.
Consumer groups are independent and can subscribe to the same set of topics without interfering with each other. Coupled with the message retention mechanism on the Broker side, Kafka’s Consumer Group perfectly circumvents the scalability issues mentioned above. Kafka uses only one mechanism, the Consumer Group, but implements two models of a traditional messaging engine: a message queue model if all instances belong to the same Group; If all instances belong to different groups, it implements a publish/subscribe model.
In a real-world scenario, how do I know how many Consumer instances there should be under a Group? Ideally, the number of Consumer instances should equal the total number of partitions that the Group subscribes to.
You might ask, can I set instances that are less than or greater than 6? Of course you can! If you have 3 instances, then on average each instance consumes about 2 partitions (6/3 = 2); If you set 8 instances, then unfortunately 2 instances (8 — 6 = 2) will not be allocated any partitions and will remain idle forever. Therefore, it is generally not recommended to have a Consumer instance larger than the total number of partitions in actual use. Setting up redundant instances is a waste of resources without any benefit.
The displacement of consumption
For the Consumer Group, it is a Group of KV pairs, Key is the partition, and V corresponds to the latest shift of Consumer consumption of the partition. In Java, you might think of a data structure like Map
, where TopicPartition represents a partition and Long represents the type of displacement.
Older versions of Consumer Group saved shifts in ZooKeeper. The most obvious benefit of saving the shift to an external ZooKeeper system is to reduce the state saving overhead on the Kafka Broker side. A popular idea is to make server nodes stateless, which allows them to scale freely and achieve extreme scalability. This is why Kafka originally kept the Consumer Group shift in a framework separate from the Kafka cluster.
However, it became clear that meta-frameworks like ZooKeeper weren’t really suited for frequent write updates, whereas the Consumer Group’s shift updates were a very frequent operation. This kind of high-throughput write can significantly slow down the performance of a ZooKeeper cluster, so there is a growing consensus in the Kafka community that it is inappropriate to store the Consumer shift in ZooKeeper.
As a result, in the new version of Consumer Group, the Kafka community redesigned the shift management for Consumer Groups by storing shifts inside Kafka themes. This internal theme is the love-hate __consumer_offsets. The new version of the Consumer Group saves the shift in an internal theme on the Broker side.
Rebalance weight balance
Finally, there’s the Consumer Group’s well-known Rebalance process, also known as Rebalance. I call it “famous,” which in a way is “notorious,” because there are so many bugs. I’ll keep it in suspense, but I’ll explain why it’s “hated.” Let’s start with what is Rebalance.
Rebalance is essentially a protocol that dictates how all consumers in a Consumer Group agree to allocate each partition of a subscription Topic. For example, a Group with 20 Consumer instances subscribes to a Topic with 100 partitions. Normally, Kafka allocates an average of five partitions per Consumer. This allocation process is called Rebalance.
So when does the Consumer Group make Rebalance? There are three conditions that trigger Rebalance.
- The number of group members has changed. Procedure For example, a new Consumer instance joins or leaves the group, or a Consumer instance crashes and is “kicked” out of the group.
- The number of subscribed topics changed. Procedure. The Consumer Group can subscribe to topics using regular expressions, for example
Running the consumer. The subscribe (Pattern.com (" t. * c))
This indicates that the Group subscribes to all topics starting with the letter T and ending with the letter C. When you create a new topic that meets this condition during a Consumer Group run, this Rebalance occurs to the Group. - The number of partitions subscribed to the topic changed. Kafka currently allows you to increase the number of partitions for only one topic. When the number of partitions increases, all groups that subscribe to the topic Rebalance.
When Rebalance occurs, all Consumer instances in the Group are coordinated together. How, you might ask, does each Consumer instance know which partitions of a subscription topic to consume? This is where allocation strategies come in.
Kafka currently provides three allocation strategies by default, each of which has advantages and disadvantages to ensure the most fair allocation, i.e. each Consumer instance gets an average number of partitions. For example, if a Group has 10 Consumer instances consuming 100 partitions, the ideal allocation strategy would be to average 10 partitions per instance. This is called a fair distribution strategy. If there is a severe allocation skew, it is inevitable that some instances will be “idle dead” and others will be “busy dead.”
Let’s use a simple example to illustrate how a Consumer Group Rebalance. Given that there are two consumers in A Consumer Group, such as A and B. When A third member C joins, Kafka triggers the Rebalance and repartitions A, B, and C based on the default allocation policy.
After the Rebalance, it’s clear that the allocation is still fair, with each Consumer instance getting three partitions. That’s what we want to happen.
Now that I’ve finished talking about the Rebalance, LET me talk about its’ hate ‘.
- Rebalance has a huge impact on Consumer groups. If you’re familiar with the JVM’s garbage collection mechanism, you’ve probably heard of everything standing Still collection, known as Stop the World, or STW. During STW, all application threads stop working and the entire application freezes. The Rebalance process is similar. During the Rebalance, all Consumer instances stop consuming and wait for the Rebalance to complete. This is one of the worst aspects of Rebalance.
- Currently, the design of Rebalance is for all Consumer instances to participate and all partitions to be reassigned. It would be more efficient to minimize changes in the allocation scheme. After Rebalance, if possible, it’s better to let instance A continue consuming partitions 1, 2, and 3 instead of being reassigned to other partitions. In this way, instance A’s TCP connection to the Broker on which these partitions are located can continue to be used without recreating Socket resources to connect to other brokers.
- Rebalance is too slow. There was a foreign user who had hundreds of Consumer instances in his Group. Making the Rebalance would take hours! This is totally intolerable. The tragedy is that there is nothing the community can do about it, at least not yet with particularly good solutions. Maybe the best solution is to avoid making this Rebalance.
Eight, the displacement of the theme
__consumer_offsets has the more formal name Offsets Topic in the Kafka source code. For the sake of today’s discussion, I’ll uniformly use the shift theme to refer to __consumer_offsets. Notice that it has two underscores.
The shift management mechanism of the new Consumer is also very simple. The shift data of the Consumer is submitted to __consumer_offsets as plain Kafka messages. Suffice it to say, the main purpose of __consumer_offsets is to hold the displacement information of the Kafka consumer.
A shift theme is a normal Kafka theme. You can manually create it, modify it, or even delete it. But it’s also an internal theme, and for the most part, you don’t really need to “talk” about it, just leave it to Kafka.
A shift theme is a normal Kafka theme, but its message format is defined by Kafka itself and cannot be modified by the user. This means that you cannot write messages to this theme arbitrarily, because if you write messages that do not conform to Kafka’s specified format, Kafka cannot parse them internally. It causes the Broker to crash. In fact, Kafka Consumer has an API for you to submit a shift, that is, write a message to a shift topic. You should never write a Producer and randomly send messages to that topic.
The so-called message format, you can simply think of as a KV pair. Key and Value represent the Key and body of the message, respectively, which in Kafka are simply byte arrays.
Let’s start with Key. There are many consumers in a Kafka cluster, and since this topic stores shift data for consumers, there must be a field in the message format that identifies which Consumer the shift data belongs to. Which field is appropriate for this data? Obviously, it’s better to put it in Key.
Now that we know that the Key of the topic message should hold a field identifying Consumer, what field in Kafka currently identifies Consumer? Remember the Group ID we mentioned earlier when we said Consumer Group? Yes, this is the field that identifies a unique Consumer Group.
With that said, let me say a few more words. In addition to Consumer groups, Kafka supports Standalone consumers, also known as Standalone consumers. It operates completely differently from the Consumer Group, but the shift management mechanism is the same. Thus, even the Standalone Consumer, which has its own Group ID to identify itself, is also suited to this message format.
Okay, now we know that the Group ID is saved in the Key, but can we just save the Group ID? Remember that the Consumer commits the shift at the partition level, i.e. it commits the shift for one or more partitions, so obviously the partition to which the Consumer is committing the shift should also be saved in the Key.
The Key of the shift topic should hold three parts:
.
Of course, this is not the only message format for a shift topic. In fact, there are three message formats. In addition to the format we just described, there are two other formats:
- A message used to hold Consumer Group information.
- Used to delete Group expired shifts or even delete Group messages.
The first format is so esoteric that it’s almost impossible to find in a search engine. However, you just need to remember that it’s used to sign up for a Consumer Group.
The second format is more well-known. It has its own name: tombstone messages, or delete Mark. Next time you see these words in Google or Baidu, don’t be surprised. They refer to the same thing. These messages only appear in the source code and are not exposed to you. Its main feature is that its message body is NULL, or empty message body.
So, when will such messages be written? Once all Consumer instances of a Consumer Group have stopped and their shift data has been deleted, Kafka writes a tombstone message to the corresponding partition of the shift topic to delete the Group completely.
Ok, so much for message formats, let’s talk about how the displacement theme is created. Typically, Kafka automatically creates a displacement theme when the first Consumer in a Kafka cluster starts. As we said, a shift theme is a normal Kafka theme, so it has a corresponding partition number. But if Kafka creates partitions automatically, how do you set the number of partitions? It depends on the Broker side parameter offsets. Topic. Num. Value of partitions. Its default value is 50, so Kafka automatically creates a 50-partition displacement theme. If you’ve ever been surprised by the number of __consumer_offsets-xxx directories popping up in Kafka log paths, you know that this is the displacement theme Kafka automatically creates for you.
You may ask, besides the number of partitions, how is the number of copies or backup factors controlled? The answer is simple, this is the Broker end another parameter offsets. The topic. The replication. The factor of things to do. Its default value is 3.
To summarize, if the displacement topic is automatically created by Kafka, the number of partitions for the topic is 50 and the number of copies is 3.
Of course, you can also create the displacement theme manually by using the Kafka API before any consumers are started in the Kafka cluster. The advantage of manual creation is that you can create displacement themes that meet the needs of your actual scene. Like a lot of people say 50 partition is too much for me, I don’t want so many partitions, so you can create your own it, don’t ignore offsets. The topic. Num. The value of the partitions.
My advice to you, however, is to let Kafka create automatically. Some of the current Kafka source code hardcodes 50 partitions, so if you create your own displacement theme that is different from the default partition number, you can run into all sorts of strange problems. This is a bug in the community, the code has been fixed, but it is still under review.
The displacement theme was created for use, of course, but where is it used? We’ve been saying that Kafka Consumer submits shifts to this topic. How does a Consumer commit shifts? Currently, Kafka Consumer can commit shifts in two ways: automatically and manually.
The Consumer side has a parameter called enable.auto.mit. If true, the Consumer will silently commit to you on a regular basis, with a dedicated parameter called auto.mit.interval.ms controlling the interval between submissions. One of the obvious advantages of automatic commit shifts is that you don’t have to worry about committing shifts to ensure that message consumption is not lost. But this is also a disadvantage. Because it’s so easy, it loses a lot of flexibility and control, and you can’t control the shift management on the Consumer side.
In fact, many big data frameworks that integrate with Kafka, such as Spark and Flink, disable automatic commit shifts. This leads to another shift commit: manual shift, which is enable.auto.mit = false. Once false is set, you, the Consumer developer, are responsible for shifting the submission. The Kafka Consumer API provides you with methods for shifting submissions, such as Consumer.com mitSync. When these methods are called, Kafka writes the corresponding message to the displacement topic.
If you choose to commit shifts automatically, there may be a problem: As long as the Consumer is started, it will write messages to the shift topic indefinitely.
Let’s take an extreme example. Suppose the Consumer is currently consuming the last message on a topic, and the shift is 100. Since there are no new messages for that topic, the Consumer has no message to consume, so the shift stays at 100 forever. Since the shift is automatically committed, messages with shift =100 are constantly written to the shift subject. Obviously Kafka only needs to keep the latest message in this category, and all previous messages can be deleted. This requires Kafka to have a message deletion strategy that is specific to the characteristics of the displacement topic messages, otherwise the messages will multiply and eventually overwhelm the entire disk.
How does Kafka remove expired messages from a displacement topic? The answer is to Compaction. Many domestic literature have translated it into compression, I personally have a little reservation. The proper term for Compression in English is Compression, which works very differently from Compaction, which I prefer to translate into Compaction, or simply using the term from JVM garbage collection: Compaction.
However you translate it,Kafka uses the Compact policy to remove expired messages in a displaced topic, preventing the topic from ballooning indefinitely. So how do you define expiration in the Compact policy? For two messages M1 and M2 with the same Key, if M1 is sent earlier than M2, then M1 is an expired message. The process of Compact is to scan all the messages in the log, weed out those that are out of date, and collate the rest together. I posted an image from the official website here to illustrate the Compact process.
The Key of the message with displacement 0, 2 and 3 in the figure is K1. After Compact, the partition only needs to save the message with displacement 3 because it is the most recent sent.
Kafka provides a special background thread that periodically inspects the topic to be Compact to see if there is deletable data that meets the criteria. This background thread is called the Log Cleaner. A lot of real production environments have the problem of displacement theme infinite expansion taking up too much disk space, if your environment also has this problem, I recommend you to check the status of the Log Cleaner thread, usually this thread is the result of hanging.
Nine, consumer group rebalancing
Specifically, when a Consumer application commits a shift, it actually commits a shift to the Broker where the Coordinator resides. Similarly, when a Consumer application is started, it sends requests to the Broker where a Coordinator resides. The Coordinator then performs metadata management operations such as registration of Consumer groups and member management records.
Coordinator components are created and started when all brokers are started. That is, all brokers have their own Coordinator components. How does a Consumer Group determine which Broker its Coordinator is serving? The answer lies in the internal displacement theme __consumer_offsets in Kafka we talked about earlier.
Currently, Kafka’s algorithm for determining the Broker where a Coordinator resides for a Consumer Group has two steps.
- Determined by the displacement of the theme which partition to save the Group data: partitionId = math.h abs (groupId. HashCode () % offsetsTopicPartitionCount).
- Locate the Broker where the Leader copy of the partition resides. The Broker is the Coordinator.
Just a quick explanation of the above algorithm. First, Kafka evaluates the hash of the Group’s group.id parameter. For example, if you have a Group whose group.id is set to “test-group”, its hashCode value should be 627841412. Second, Kafka computs the number of partitions __consumer_offsets, usually 50, and then modulates the number of partitions to the absolute value of abs(627841412% 50) = 12. At this point, we know that partition 12 of the shift topic is responsible for holding the Group’s data. With the partition number, step 2 of the algorithm becomes easy, we just need to find out which Broker the Leader copy of the shift topic partition 12 is on. The Broker is the Coordinator we are looking for.
In real life, Consumer applications, especially the Java Consumer API, can automatically find and connect to the right Coordinator, so we don’t have to worry about that. The most important thing to know about this algorithm is that it can help us solve localization problems. When a problem occurs in the Consumer Group and the Broker logs need to be quickly checked, the algorithm can accurately locate the Broker corresponding to the Coordinator without blind check from Broker to Broker.
What’s the downside of Rebalance? To sum up, there are the following three points:
- Rebalance Affects the TPS on the Consumer. The bottom line is that during the Rebalance, a Consumer will stop doing anything.
- Rebalance is slow. If you are in a Group with a lot of people, you will definitely have this pain point. If there are hundreds of Consumer instances under the Group, the Rebalance will take hours at a time. In that scenario, the Consumer Group’s Rebalance was completely out of control.
- Rebalance is not efficient. The current Kafka design mechanism dictates that all members of the Group Rebalance every time. Locality is not considered, but it is especially important to improve system performance.
For point 3, let’s take a simple example. For example, a Group has 10 members, and each member consumes 5 partitions on average. Let’s say a member has quit. Now you need to make a new Rebalance and “shift” the five partitions that this member was responsible for to other members. Obviously, it would be better to keep the current plan of nine Consumer partitions unchanged and randomly assign the five partitions to each of them to minimize the impact of the Rebalance on the remaining consumers.
Unfortunately, that’s not how Kafka is currently designed. By default, every time you Rebalance, the previous allocation is not retained. As in this example, when the Rebalance starts, the Group will break up the 50 partitions (10 members * 5 partitions) and reassign them from the 9 currently living members. Obviously that’s not very efficient. For this reason, in 0.11.0.0, the community launched StickyAssignor, a sticky partition allocation strategy. Making this Rebalance means that every time you Rebalance, you’ll make as little change to the partition as possible. Unfortunately, this strategy is a bit buggy and requires an upgrade to 0.11.0.0, so it’s not used much in real production environments.
To sum up, Rebalance has all three disadvantages. In particular, is there a community solution to slow Rebalance and TPS? With regard to these two points, I can tell you very responsibly: “No solution!” In particular, the Kafka community has nothing to do with slow Rebalance.
In my experience, in real business situations, a lot of Rebalance is unplanned or unnecessary. Most of our TPS is slowed down by this Rebalance, so it’s important to avoid this Rebalance. Here’s how to avoid Rebalance.
To avoid Rebalance, start with the timing of the Rebalance. As we said earlier, there are three times when Rebalance occurs:
- The number of group members has changed
- The number of subscribed topics has changed
- The number of partitions subscribed to the topic changed
The latter two are often operational initiatives, so much of the Rebalance they cause is inevitable. Let’s focus on how to avoid making the Rebalance because of changing group members.
If the number of Consumer instances in the Consumer Group changes, this must cause Rebalance. This is the most common cause of Rebalance. This is what causes 99% of all the Rebalance I encounter.
The increase in Consumer instances is easy to understand. When we start a Consumer program configured with the same group.id value, we actually add a new Consumer instance to the group. At this point, the Coordinator accepts the new instance, adds it to the group, and reassigns partitions. Typically, adding a Consumer instance is planned, perhaps for increased TPS or scalability. All in all, it’s not one of those unnecessary Rebalance we want to avoid.
We’re more concerned about the Group instance reduction. If you want to stop some Consumer instances, the point is that in some cases a Consumer instance can be mistakenly considered “stopped” by a Coordinator and thus “kicked” out of the Group. If this is causing the Rebalance, we can’t just leave it.
When does a Coordinator think that a Consumer instance has been suspended and needs to be degroup? This is definitely a topic that needs to be discussed. Let’s talk about it in more detail.
After the Consumer Group completes the Rebalance, each Consumer instance periodically sends a heartbeat request to a Coordinator to indicate that it is still alive. If a Consumer instance fails to send heartbeat requests ina timely way, the Coordinator will consider the Consumer “dead” and remove it from the Group. The Coordinator then makes a new Rebalance. The Consumer side has a parameter called session.timeout.ms, which is used to indicate this. The default value of this parameter is 10 seconds. That is, if a Coordinator does not receive a heartbeat from a Consumer instance in the Group within 10 seconds, it considers the Consumer instance to have been suspended. Session.timout. ms determines the interval between Consumer memory activity, so to speak.
In addition to this parameter, Consumer provides a parameter that allows you to control how often heartbeat requests are sent, heartbeat.interval.ms. The smaller this value is set, the more frequently the Consumer instance will send heartbeat requests. The Coordinator can tell each Consumer instance that the Rebalance is on more quickly, because the Coordinator is telling each Consumer instance that the Rebalance is on. Encapsulate the REBALANCE_NEEDED flag in the response body of a heartbeat request.
In addition to the above two parameters, there is another parameter on the Consumer side that controls the impact of the Consumer’s actual consumption power on the Rebalance. This parameter is Max. Poll.interval. It limits the maximum interval between two calls to the poll method by the Consumer application. The default value is 5 minutes, which means that if your Consumer cannot consume the poll message within 5 minutes, the Consumer will initiate a “leave the group” request and the Coordinator will start a new round of Rebalance.
Now that you know what these parameters mean, let’s clarify what Rebalance is “unnecessary.”
The first type of Rebalance is unnecessary because a Consumer is “kicked” out of the Group because it fails to send a heartbeat in time. Therefore, you need to set the values of session.timeout.ms and heartbeat.interval.ms carefully. Here are some recommended values that you can apply “mindlessly” to your production environment.
- Set session.timeout.ms to 6s.
- Set heartbeat.interval.ms to 2s.
- Ensure that the Consumer instance can send at least 3 heartbeat requests before it is judged dead, that is, session.timeout.ms >= 3 * heartbeat.interval.ms.
The purpose of setting session.timeout.ms to 6s is for coordinators to locate failed consumers more quickly. After all, we still want to catch those “dead meat” consumers and kick them out of the Group as soon as possible. Hopefully this configuration helps you avoid the first type of “unnecessary” Rebalance.
The second type of unnecessary Rebalance is that consumers spend too much time consuming. I had a client who had a scenario where the Consumer needed to process the message and write it to MongoDB when consuming data. Obviously, this is a very heavy consumption logic. Even the slightest instability in MongoDB can cause the Consumer to take longer to consume. In this case, the parameter value of max.poll.interval.ms is very important. To avoid unexpected Rebalance, you’d better set this parameter to a larger value than your downstream maximum processing time. In the example of MongoDB, if the maximum time to write MongoDB is 7 minutes, you can set this parameter to about 8 minutes.
In short, allow plenty of time for your business logic. This way, the Consumer won’t make the Rebalance because it takes too long to process the messages.
If you make this Rebalance properly, I suggest you check the GC behavior on the Consumer end for frequent Full GC pauses that cause the Rebalance. Why specifically say GC? That’s because in real life, I’ve seen a lot of unexpected Rebalance situations where GC Settings are wrong and Full GCS occur too often.