Problem description

Kafka: rebalanced: rebalanced: kafka: rebalanced: rebalanced

WARN  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Synchronous auto-commit of offsets {am_performance_topic-0=OffsetAndMetadata{offset=27914, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Revoking previously assigned partitions [am_performance_topic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [am_performance_topic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] (Re-)joining group
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Successfully joined group with generation 474
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Setting newly assigned partitions [am_performance_topic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerCont
Copy the code

For me, a kafka nerd, the only solution was to search the Internet. Here are my helpful posts on how to solve the problem:

Kafka session.timeout.ms heartbeat.interval. Ms Kafka auto. Offset Kafka consumer Group auto-.offset. Reset Spring – Kafka consumer Group Coordinator and Rebalance

Analysis of the

Let’s analyze the following alarm logs:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Several key parameters can be obtained: max-poll.interval. ms, max-poll. records, and session timeout. A simple translation of this entry is

The commit could not be completed because the Kafka Group was rebalanced and assigned the partition to another member. This usually means that the poll loop spends too much time processing messages, longer than the configured max-poll.interval.ms. You can solve this problem by increasing the session timeout or by decreasing the maximum batch size returned in poll (max-poll.records)

Kafka session.timeout.ms heartbeat.interval. Ms Kafka session.timeout.ms heartbeat.interval. Since a topic often has multiple partitions and we create multiple consumer consumable topics within a consumer group, the question arises: which partition messages are sent to which consumers? There are three concepts involved: consumer group, consumers within a consumer group, and each consumer group has a group coordinator. Conusmer partition allocation is implemented by group management protocol.

Each consumer ina consumer group sends a JoinGroup request to the group coordinator, so that the group coordinator has information about all consumers. Then, it selects a consumer as the Leader consumer and tells the Leader consumer: Take these member information and the topic partition information I give you to arrange which consumers are responsible for consuming which partition

Next, Leader consumer allocation strategy is based on our configuration (by the parameters of the partition. The assignment. The strategy specified for each consumer good computing their consumption of partition. Each consumer sends a SyncGroup request to the Group coordinator, but only the Leader consumer’s request has a partition allocation policy. A Group coordinator receives a partition allocation plan from the Leader consumer and sends the plan to each consumer. Let’s draw a picture that looks something like this:

Normally, when a consumer moves in or out of a consumer group, this triggers the rebalance. Rebalance is reprogramming a partition. Consumers need to be notified of the partitioning plan, which is related to the heartbeat.interval.ms parameter. To be specific: Each consumer periodically sends a hearbeat to a group coordinator based on the time specified by the heartbeat.interval.ms parameter. The Group coordinator responds to each consumer. If the rebalance occurs, consumers will receive a REBALANCE_IN_PROGRESS symbol in response to the rebalance. This means that consumers know that the rebalance has occurred. At the same time, the Group Coordinator knows about the survival of each consumer.

So why compare heartbeat.interval.ms with session.timeout.ms? Session.timeout. ms Indicates the time it takes for the group coordinator to detect a consumer crash. A consumer in a consumer group is down, which takes session.timeout.ms seconds to detect. For example, session.timeout.ms=10, heartbeat.interval.ms=3

Session.timeout. ms is a “logical” indicator that specifies a threshold of 10 seconds at which a coordinator considers the consumer to have died if a coordinator does not receive any messages from the consumer. Ms is a “physical” indicator that tells a consumer to send a heartbeat packet to a coordinator every three seconds. The smaller the heartbeat. It has a real impact, which is why I call it a “physical” metric.

If a group coordinator does not receive a consumer’s heartbeat within a heartbea.interval. ms cycle, it makes little sense to remove the consumer from the group. It’s as if the consumer made a mistake and bludgeoned it to death. In fact, there could be network latency, there could bea long GC at the consumer, affecting the arrival of heartbeat packets, and maybe the next heartbeat is fine.

Ms rebalance the consumer group. Rebalance the consumer group using REBALANCE_IN_PROGRESS in the heartbeat package. The consumer will know when the rebalance is happening, so it can update the partitions that the consumer can consume. If the session.timeout.ms is exceeded, the group coordinator thinks the consumer has hung up. No need to make the rebalance to the consumer.

In versions after kafka0.10.1, session.timeout.ms and max.poll.interval.ms are decoupled. A new KafkaConsumer object contains two threads behind the consumer. Poll pull message in the while True loop. A consumer instance contains two threads: One is the heartbeat thread, the other is the Processing thread, which calls the consumer.poll method to execute the message processing logic, and the heartbeat thread is a background thread that is “hidden” from the programmer. If the message processing logic is complex, such as 5 minutes, then max.poll.interval.ms can be set to a value greater than 5 minutes. The heartbeat thread is related to the parameter heartbeat.interval.ms. The heartbeat thread sends a heartbeat packet every heartbeat.interval.ms to a coordinator to prove that it is still alive. A group coordinator considers the kafka consumer to be alive as long as the heartbeat thread has sent a heartbeat packet to a coordinator within session.timeout.ms.

Before kafka0.10.1, sending heartbeat packets and message processing logic were coupled. If a message takes 5 minutes to process and session.timeout.ms=3000ms, the group coordinator will remove the consumer from the group by the time kafka consumer has finished processing the message. The heartbeat packet cannot be sent to the group coordinator during message processing. If the heartbeat packet is not sent after 3000ms, the group coordinator moves the consumer out of the group. If the processing thread executes message processing logic and the heartbeat thread sends heartbeat packets, then: Even if a message takes 5 minutes to process, as long as the bottom heartbeat thread sends a heartbeat packet to the group coordinator at session.timeout.ms, the consumer can continue processing the message without worrying about being removed from the group. Another benefit is that if something goes wrong with a consumer, it can be detected within session.timeout.ms instead of waiting for max.poll.interval.ms.

A small summary

Therefore, there are two reasons for rebalanced repeated consumption:

  • If no heartbeat packet is received within session.timeout.ms, the heartbeat thread sends a heartbeat every heartbeat.interval.ms.
  • In max.poll.interval.ms, the maximum number of max.poll.records messages that kafka consumers captured at a time were not processed, and the maximum number of messages that kafka consumers captured at a time was not returned to update the offset information within max.poll.interval.ms.

Therefore, the problem this time is also very simple, which is the second case mentioned above. The solution is to increase the Max. Poll.interval. ms timeout and reduce Max. Poll.records (the amount of messages per poll). If you are using the Spring-Kafka framework, this part of the parameter how to write can refer to the Spring Boot Reference Guide and the above blog post in the spring- Kafka producer consumer configuration details. The last modification is as follows:

Kafka: producer: the bootstrap - the servers: 127.0.0.1:9092127.00 0.1:9093127.00 0.1:9094 key - serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: truegroup-id: boot_kafka auto-commit-interval: 100 auto-offset-reset: earliest max-poll-records: 50 bootstrap-servers: 127.0.0.1:9092127.00 0.1:9093127.00 0.1:9094 properties: Max. Poll. Interval. Ms: 1200000Copy the code

The default value of max.poll.interval.ms is 300000 (5 minutes), and the default value of max.poll.records is 500. Refer to Apache Kafka Documentation for more parameter descriptions

Let’s talk about the auto-offset-reset field

Auto-.offset. Reset Value Description

  • When there are submitted offsets under each of the earliest partitions, the money will be consumed from the submitted offsets. If there is no submitted offset, the consumption starts from scratch
  • Latest If each partition has a submitted offset, the consumption starts from the submitted offset. If there is no committed offset, the newly generated data under the partition is consumed
  • None topic will be consumed from offset if all partitions have submitted offsets. An exception is thrown whenever a partition does not have a committed offset

Test process:

  • Earliest mode: The name of the Kafka source is A1

    1. In a1, topic is test1, groupId is 0001,0001 has never been consumed, and sql1(select * from a1) will be consumed from the beginning

    2. Select * from a1 (select * from A1); select * from a1 (select * from a1); select * from a1 (select * from a1)

  • Latest mode: The kafka source name is A2

    1. In a2, topic is b, groupId is 0002,0002 is not consumed, data is sent in advance, then start sql2(select * from a2), no result is seen in jmeter, check relevant metrics in flink, no data is read; Send a batch of (8) data without killing SQl2 and consume only the last 8 sent data.

    2. Stop SQl2 in 1, do not replace groupId in A2, send 7 pieces of data to B, start SQL2, and display only the last 7 pieces of data

  • None mode: The kafka source is named A3

    1. In A3, topic is C, groupId is 0001(not consumed), data is sent ahead of time, sql3(select * from A3) is started, SQL execution fails, error is reported in log.

    2. In A3, topic = C, set groupId to 0002(consumed), start SQL3 (select * from A3), send 8 data to C, jMeter display 8 data.