Learn more about Java basics


This series of notes is based on the geek time course Kafka Core Technology and Combat

This directory

  • Consumer groups
  • Avoid consumer Rebalance
  • Consumer group rebalance the whole process analysis

Consumer groups

Consumer Group: Extensible and fault-tolerant message maker mechanism provided by Kafka.

Important features

  • There can be multiple Consumer instances within a group.
  • The unique identification of a consumer Group is called a Group ID, and the consumers within the Group share this common ID.
  • Consumer groups subscribe to topics, and each partition of a topic can only be consumed by one consumer within the group
  • The consumer group mechanism implements both the message queue model and the publish/subscribe model.
  • The number of instances in the consumer group had better be the same as the number of partitions in the subscribed topic, otherwise the extra instances would just be idle. A partition can only be subscribed to by one consumer instance.

Shift management approach for consumer groups

  • For the Consumer Group, the displacement is a Group of KV pairs, Key is the partition, and V corresponds to the latest displacement of Consumer consumption in the partition.
  • The shift of the consumer group in older versions of Kafka is stored in Zookeeper. The benefit is that Kafka reduces the overhead of saving the Kafka Broker side state. However, ZK is a distributed coordination framework, which is not suitable for frequent write updates. Such high-throughput write operations greatly slow down the performance of the Zookeeper cluster.
  • The new version of Kafka uses the theme of saving shifts inside Kafka__consumer_offsetsMethods.

Rebalancing of consumer groups

Rebalance: Essentially a protocol that specifies how each consumer in a consumer group agrees to allocate each partition under a subscription topic.

Triggering conditions:

  • The number of group members has changed. Procedure
  • The number of subscribed topics changed. Procedure
  • The number of ordered subject zones has changed

Rebalance is designed to require all consumer instances to participate and all partitions to be reassigned. And the Rebalance is slow. All Consumer instances stop consuming while the Rebalance is complete.

Avoid consumer Rebalance

What is rebalancing

  • The process of getting all Consumer instances under a Consumer Group to agree on how to consume all partitions of a subscribed topic.
  • During the rebalancing process, all Consumer instances work together to allocate subscription partitions with the help of the coordinator component.
  • Throughout the process, none of the instances can consume any messages, which has a significant impact on the Consumer’s TPS

Why do you want to avoid rebalancing

  • Rebalance affects the TPS on the Consumer side because consumers cannot consume messages during rebalancing
  • Rebalance is slow, and the whole process can take hours if there are hundreds of consumer instances
  • Rebalance is inefficient. This is an all-hands-on-all process that usually doesn’t take into account locality, which is especially important for improving system performance.
  • In real business situations, a lot of Rebalance is unplanned or unnecessary.

When rebalancing is triggered

  • The number of group members has changed
  • The number of subscribed topics has changed
  • The number of subscribed topic partitions changed.

What rebalancing to avoid

The most common is rebalancing triggered by a change in the number of consumers. Other rebalancing is inevitable, but a change in the number of consumers is avoidable

  • When a Consumer program with the same group. Id value is started, a Consumer instance is added to the group. This situation in Mid-Autumn Festival is generally planned to improve TPS on the Consumer side, so there is no need to avoid it.
  • Consumer instance reduction
    • Planned reductions in consumer instances are also unavoidable
    • Unplanned rebalancing to reduce triggering is what we need to focus on.

How to avoid rebalancing

In some cases, a Consumer instance can be mistakenly considered “stopped” by Coordinateor and kicked out of the Group. The resulting rebalancing needs to be avoided.

The Consumer instance cannot send heartbeat requests in a timely manner

After the Consumer group is rebalanced, each Consumer instance periodically sends a heartbeat request to a Coordinator. If the heartbeat request is not sent in time, the Coordinator considers the Consumer to be offline, removes it from the group, and starts a new round of rebalancing.

Solve the setting of Consumer terminal:

  • Session.timeout.msThe default value is 10 seconds. If a Coordinator does not receive a heartbeat from a Consumer instance in the Group within 10 seconds, the instance is considered offline. This can be scaled up appropriately
  • heartbeat.interval.ms: Controls the frequency of sending heartbeat requests. Sending heartbeat requests frequently consumes tape library resources.
  • max.poll.interval.ms: sets the maximum interval between calls to the poll method by the Consumer application. The default is 5 minutes, which means that if the Consumer fails to consume the messages returned by the poll method within 5 minutes, the Consumer will initiate an “out of the group” request, resulting in rebalancing.

Advice:

session.timeout.ms=6s  
Heartbeat.interval.ms=2s  
Copy the code

Ensure that the Consumer instance can send at least three heartbeat requests before it is deemed dead, that is, session.timeout.ms >=3 * heartbeat.interval.ms.

The consumption time is too long

The Consumer side processed a heavy consumption logic and took a long time, causing the Consumer application to call the poll method twice more than the maximum interval set.

Solution:

  • willmax.poll.interval.msParameter Settings are larger
  • Optimize the business logic on the consumer side and reduce consumption time

Iii. GC influence

GC performance on the Consumer side also leads to frequent rebalancing, and frequent Ful GC leads to long outages.

Solution: JVM tuning. (See the parameter tuning section.)

Expand knowledge – Coordinator

A Coordinator in Kafka is a Consumer Group that performs Rebalance, shift management, and Group membership management. Specifically, when a Consumer application commits a shift, it actually commits a shift to the Broker where the Coordinator resides. Similarly, when a Consumer application is started, it sends requests to the Broker where a Coordinator resides. The Coordinator then performs metadata management operations such as registration of Consumer groups and member management records. Coordinator components are created and started when all brokers are started. That is, all brokers have their own Coordinator components. How does a Consumer Group determine which Broker its Coordinator is serving? The answer lies in the internal displacement theme __consumer_offsets in Kafka we talked about earlier. Currently, Kafka’s algorithm for determining the Broker where a Coordinator resides for a Consumer Group has two steps.

  1. Determined by the displacement of the theme which partition to save the Group data: partitionId = math.h abs (groupId. HashCode () % offsetsTopicPartitionCount).
  2. Locate the Broker where the Leader copy of the partition resides. The Broker is the Coordinator.

Just a quick explanation of the above algorithm.

  1. First, Kafka evaluates the hash of the Group’s group.id parameter. If you have a Group whose group.id is set to “test-group”, its hashCode value should be 627841412.
  2. Second, Kafka counts the number of partitions __consumer_offsets, usually 50,
  3. Then take the modulus of the partition number and calculate the absolute value of the hash value, namely ABS (627841412% 50) = 12.

At this point, we know that partition 12 of the shift topic is responsible for holding the Group’s data. With the partition number, step 2 of the algorithm becomes easy, we just need to find out which Broker the Leader copy of the shift topic partition 12 is on. The Broker is the Coordinator we are looking for.

Consumer group rebalance the whole process analysis

Notification mechanism for rebalancing

  • The rebalancing process is notified to other consumer instances through the Heartbeat Thread on the message side.
  • The Kafka Java consumer needs to periodically send heartbeat requests to the Broker side coordinator to indicate that it is still alive.
    • Prior to kafka version 0.10.1.0, sending a heartbeat request was done on the consumer main thread, the thread that calls the kafkaconsumer.poll method in the code. In this way, the message processing logic is also done in this thread, so if message processing takes too long, the heartbeat request will not be delivered to the coordinator in time, causing the coordinator to erroneously assume that the consumer is dead.
    • After this release, the Kafka community introduced a separate heartbeat thread to perform heartbeat request sending exclusively to avoid this problem.
  • The notification mechanism for rebalancing is done through the heartbeat thread, and when the coordinator decides to start a new round of rebalancing, he wraps “REBALANCE_IN_PROGRESS” into the response to the heartbeat request and sends it back to the consumer instance. When a consumer instance detects that the heartbeat response contains “REBALANCE_IN_PROGRESS,” it immediately knows that rebalancing has begun.
  • Consumer-side parametersheartbeat.interval.msThe real use of rebalance is to control the frequency of rebalance notifications.

Consumer group state machine

Kafka has designed a set of consumer group State machines to help the coordinator complete the rebalancing process.

Kafka consumer group status

  • Empty: There are no members in the group, but the consumer group may have submitted shifts that have not yet expired.
  • Dead: There is no member in the group, but the metadata of the group has been removed on the coordinator side. The coordinator keeps information about all groups that are currently registered with it, and metadata is similar to this registration information.
  • PreparingRebalance: The consumer group is ready to start rebalancing, at which point all members rerequest to add the consumer group
  • CompletingRebalance: All members of the consumer group have joined, and each member is waiting for the assignment scheme.
  • Stable: The stable state of the consumer group. This status indicates that the rebalancing is complete and that members of the group are able to consume data normally.

Transition of consumer group state

Kafka periodically automatically removes expired shifts as long as the group is in the Empty state. If the consumer group is off for a long time (more than 7 days), Kafka will most likely delete the shift data for that group.

Consumer side rebalancing process

  1. The complete process of rebalancing requires the participation of both the consumer and coordinator components.
  2. On the consumer side, rebalancing is divided into two steps:
    1. Join a group. The request is JoinGroup request
    2. Waiting for leader consumer assignment scenario: SyncGroup request
  3. When a member of a group joins a group, it sends a JoinGroup request to the coordinator. In this request, each member reports their subscribed topics so that the coordinator can collect subscription information for all members. Once the JoinGroup requests from all members have been collected, the coordinator selects one of these members to be the leader of the consumer group.
  4. Normally, the first member to send a JoinGroup request automatically becomes the leader. The leader here is the concrete consumer instance, which is neither a copy nor a coordinator. The task of the leader consumer is to collect subscription information from all members, and then, based on this information, formulate a specific partition consumption allocation scheme.
  5. After the leader is selected, the coordinator encapsulates the consumer group subscription information into the JoinGroup request response and sends it to the leader. After the leader makes a unified allocation plan, the next step is to send the SyncGroup request.
  6. The leader sends a SyncGroup request to the coordinator to send the allocation scheme just made to the coordinator. It is worth noting that other members also send SyncGroup requests to the coordinator, but without the actual content in the request body. The goal of this step is for the coordinator to receive the allocation plan and then send it to all members in a SyncGroup response so that all members of the group know which partitions they should consume.

Analysis of the rebalance scenario

A. Add A new member to the group

When the coordinator receives a new JoinGroup request, it notifies all existing members of the group through a heartbeat request response, forcing them to start a new round of rebalancing.(The content of the SyncGroup request sent by the leader in the following figure should be wrong, which should be an allocation scheme. The same is true in the next four figures.)

B. Group members voluntarily leave the group

The thread or process on which the consumer instance is located calls the close() method to actively notify the coordinator that it wants to exit. This scenario involves a third type of request: the LeaveGroup request. After receiving the LeaveGroup request, the coordinator will still notify other members in the form of heartbeat response.

C. A group member breaks down and leaves the group

Crash disgroup is the disgroup caused by the sudden outage of a serious failure of a consumer instance. Crash ungroup is passive, and the coordinator usually has to wait for a period of time to feel it, which is usually a consumer-side parametersession.timeout.msControl.

D. The coordinator’s processing of the displacement submitted by members in the group during rebalancing

Normally, each group member will report the displacement to the coordinator on a regular basis. When rebalancing is enabled, the coordinator gives members a buffer period during which each member must quickly report his/her displacement. Then the JoinGroup/SyncGroup request is sent when normal JoinGroup/SyncGroup requests are enabled.