In Kafka, a topic pair should have multiple partitions, and each partition must be consumed by only one consumer from the same consumer group. To implement this constraint, Kafka supports three partitioning allocation strategies to assign partitions to consumers. In addition, there are always changes in the system, such as customer downtime, new customers to join, and so on. In order to cope with these changes, Kafka introduces a redistribution of partitions to ensure the rationality of partition allocation in various scenarios.

Partition allocation policy

A partition allocation policy is an implementation that assigns partitions to consumers. Kafka provide consumers the client parameters partition. The assignment, the strategy to set up the partition allocation between consumers and subscribe to the topic. The default is RangeAssignor, and both RoundRobinAssignor and StickyAssignor are supported.

RangeAssignorAllocation policy

The principle of the RangeAssignor allocation strategy is to obtain a span by dividing the total number of consumers by the total number of partitions, and then allocating the partitions evenly across the span to ensure that the partitions are as evenly distributed as possible to all consumers. For each topic, the RangeAssignor strategy arranges all consumers in the consumer group in lexicographical order by name, then divides each consumer into partitions, and if the distribution cannot be equal, the consumers at the top of the sublexicographical order are assigned an additional partition.

Assuming n= number of partitions/number of consumers, and m= number of partitions % number of consumers, then the first m consumers are allocated N +1 partitions each, and the following consumers are allocated N partitions each.

Suppose two consumers (C0, C1) subscribe to a topic that has three partitions (P0, P1, P2). Then the first consumer is assigned 2 partitions each, and the last consumer is assigned 1 grouping.

Consumer C0: P0, P1 Consumer C1: P2Copy the code

Consumer C2 has one less partition when there is only one topic, and if there are many such topics, the number of partitions allocated by consumer C2 is significantly smaller than the other partitions. Therefore, the disadvantage of the RangeAssignor allocation strategy is that it tends to be uneven when the number of partitions is not integer multiples of consumers, and it gets more uneven as the number of topics increases.

The recommended number of partitions is an integer multiple of the consumer

RoundRobinAssignorAllocation policy

The principle of RoundRobinAssignor is to sort the partitions of all consumers in a consumer group and all topics to which consumers subscribe by dictionary, and then assign the partitions to each consumer one by one through polling.

For example, there are still two consumers, C0 and C1, who subscribe to two topics, T0 and T1, and each topic has three partitions.

If allocated according to RangeAssignor, the result is:

Consumer C0: T0P0, T0P1, T1P0, T1P1 Consumer C1: T0P2, T1P2Copy the code

If RoundRobinAssignor is used, the result is:

Consumer C0: T0P0, T0P2, T1P1 Consumer C1: T0P1, T1P0, T1P2Copy the code

RoundRobinAssignor can be used to evenly distribute topics subscribed to by the same consumer group. However, if the same consumer group subscribes to different topics, it will still result in uneven distribution.

Suppose there are three consumers (C0,C1,C2) with three topics (T0, T1, T2), which have 1, 2, and 3 partitions respectively. Consumer C0 subscribes to T0, consumer C1 subscribes to T0 and T1, and consumer C2 subscribes to T0, T1 and T2. Then the final distribution result is:

Consumer C0: T0P0 Consumer C1: T1P0 Consumer C2: T1P1, T2P0, T2P1, t2P2Copy the code

As you can see, this allocation is not optimal because T1P1 can be assigned to consumer C1.

StickyAssignorAllocation policy

Sticky means “sticky,” and StickyAssignor has two main purposes:

  1. Partitions should be distributed as evenly as possible.
  2. Partitions should be allocated as much as possible as they were last allocated.

Again, the example above. Suppose there are three consumers (C0,C1,C2) with three topics (T0, T1, T2), which have 1, 2, and 3 partitions respectively. Consumer C0 subscribes to T0, consumer C1 subscribes to T0 and T1, and consumer C2 subscribes to T0, T1 and T2.

The results of RoundRobinAssignor are as follows:

Consumer C0: T0P0 Consumer C1: T1P0 Consumer C2: T1P1, T2P0, T2P1, t2P2Copy the code

The result of StickyAssignor is:

Consumer C0: T0P0 Consumer C1: T1P0, T1P1 Consumer C2: T2P0, T2P1, t2P2Copy the code

As you can see, StickAssignor assignments are more even.

If, at this point, consumer C0 disengages from the consumer group, the result of the RoundRobinAssignor strategy is:

Consumer C1: T0P0, T1P1 Consumer C2: T1P0, T2P0, T2P1, t2P2Copy the code

The result of StickyAssignor is:

Consumer C1: T1P0, T1P1, T0P0 Consumer C2: T2P0, T2P1, t2P2Copy the code

When rebalancing occurs, StickyAssignor ensures that partition reassignment is “sticky” by evenly distributing partitions allocated by offline consumers to existing consumers, thereby reducing unnecessary partition movement.

Rebalance the trigger timing

Partition allocation is essentially allocating partitions to consumers, so both a change in the number of partitions and a change in the number of consumers trigger rebalancing. Specifically, there are the following situations:

  1. There are new consumers joining the consumer group.
  2. There are consumers offline. Consumers who send heartbeats for a long time due to various factors will think that consumers are offline.
  3. Have consumer to withdraw consumer group actively.
  4. The number of any topic partitions subscribed to within the consumer group changes.

The coordinator

Partition assignment assigns partition ownership to consumers and therefore needs to communicate with all consumers. Kafka uses a Coordinator to do this. Each consumer client has a consumer, and each group of consumers has a corresponding coordinator on the server side. As we all know, a Kafka cluster consists of multiple Broker nodes, each of which actually has a coordinator. The coordinator for each consumer group is on a Broker node. The following two steps are required to determine the corresponding coordinator according to the consumer group:

  1. Calculate the consumption displacement zone number

The coordinator handles the consumption shift commit in addition to partition allocation and rebalancing. Displacement is submitted through internal consumption theme __consumer_offsets implementation, it is offsetsTopicPartitionCount partition number. A consumer group calculates the partition number according to the following formula:

partition = Math.abs(groupID.hashCode() % offsetsTopicPartitionCount)
Copy the code

All shifts of this consumer group are committed to this partition.

  1. Find the broker where the partition leader resides

The coordinator on the broker where the partition leader resides is what we are looking for.

interact

Consumers and coordinators interact through heartbeat mechanisms. There is a dedicated heartbeat thread in the consumer that sends heartbeats to the coordinator at heartbeat.interval.ms intervals. The coordinator also sends back a response that tells the consumer when rebalancing is needed.

As mentioned earlier, changes in the number of partitions and the number of consumers trigger rebalancing. In general, changes in the number of zones and increases in the number of customers are artificial, which is a planned rebalancing and we do not need to pay too much attention to it. However, the decline in the number of customers is usually due to system or network problems, which is also an unplanned rebalancing and therefore something we should focus on. Rebalancing is expensive, during which all consumers stop working, so we should avoid unnecessary rebalancing. Let’s take a look at some of the parameters that influence the decline in the number of consumers:

  1. session.timeout.msBroker side parameter, consumer lifetime, default 10 seconds. If the coordinator does not receive any heartbeat during this time, the consumer is considered to have broken out of the group;
  2. heartbeat.interval.ms: consumer side parameter, sending heartbeat frequency, default 3 seconds;
  3. max.poll.interval.ms: parameter of the consumer side, the maximum interval between two poll calls. The default interval is 5 minutes. If the poll cannot be consumed within 5 minutes, the poll group will be removed.

Where session.timeout.ms and heartbeat.interval.ms are related, here is a recommended reference formula:

Session.timeout. ms ≥ 3 * heartbeat.interval.msCopy the code

Max-poll.interval. ms is mainly related to the downstream processing time. For example, the downstream processing time is 6 minutes, which is unreasonable according to the default value, and consumers will frequently actively leave the group. Therefore, the value needs to be set to take longer than the downstream processing time to avoid unnecessary rebalancing.

Rebalancing process

  1. Consumers need to submit the shift immediately when they receive notification of the start of rebalancing from the coordinator;
  2. After receiving the successful response of submitting the displacement, consumers will send JoinGroup request and re-apply to join the group. The request will contain the subject information of subscription.
  3. When the coordinator receives the firstJoinGroupWhen a request is made, the consumer making the request is specified asLeader consumersWhile waitingrebalance.timeout.msIn theCollect other consumers’JoinGroupSubscription information in the requestAfter, the subscription information is placed inJoinGroupThe response is sent to the Leader consumer to inform him that he has become the Leader, and the successfully organized consumer is also sentJoinGroupRespond to other consumers;
  4. Leader consumer receiveJoinGroupThe response is based on the consumer’s subscription informationMake a distribution plan, put the plan inSyncGroup requestTo the coordinator. Ordinary consumers, after receiving the response, send it directlySyncGroupRequest, waiting for the Leader’s allocation scheme;
  5. After the coordinator receives the allocation plan, it sends the allocation plan to all consumers through the SyncGroup response.
  6. When all consumers receive the distribution plan, it means the end of the rebalancing, can start the normal consumption work.

Reference:

Mp.weixin.qq.com/s/UiSpj3Wct…

Kafka in Depth