This article mainly describes a message from the producer production delivery to Kafka, and then to the consumer to pull the message for consumption of the detailed process, the basic concept of Kafka is limited to the length of this article no longer detailed introduction; This article uses a local cluster with three Kafka services
If you have no problems with Kafka, you can skip the startup phase by knowing that there are three brokers in the Kafka cluster.
Startup phase
Start the service
The server.properties configuration for the three Kafka services is as follows:
Broker. Id = 0 listeners = PLAINTEXT: / / localhost: 9092 the dirs = D: \ kafka_2 12-2.3.0 \ kafka - logs zookeeper.connect=localhost:2181 broker.id=1 listeners=PLAINTEXT://localhost:9093 The dirs = D: \ kafka_2 \ kafka - 12-2.3.0-1 logs zookeeper. Connect = localhost: 2181 broker. Id = 2 Listeners = PLAINTEXT: / / localhost: 9094 the dirs = D: \ kafka_2 12-2.3.0-2 \ kafka - logs zookeeper. Connect = localhost: 2181Copy the code
Start Kafka with localhost:2181. Start Kafka with localhost:2181. Start Kafka with localhost:2181
PS D: \ kafka_2 12 - > 2.3.0-1 bin/Windows/kafka - server - start. Bat config/server propertiesCopy the code
You can run the zkCli command to open the ZK client and check whether the corresponding broker is started:
[zk: localhost:2181(CONNECTED) 9] ls /brokers/ids
[0, 1, 2]
Copy the code
Create a theme:
After the service is started, we create the topic-partition topic through the create option of kafka-Topics. Bat script in Windows environment, the topic has three partitions, and each partition has three copies.
PS D:\kafka_2.12-2.3.0> bin/ Windows /kafka-topics --partitions 3 --replication-factor 3 Created topic topic-partition.Copy the code
We then view the current topic information using the describe option in kafka-topics. Bat:
Bat --zookeeper localhost:2181 --describe -- topic-partition Topic:topic-partition PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic-partition Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic-partition partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: Topic-partition partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0Copy the code
The AR(All Replicas) of a partition represents All brokers that have a copy of that partition in the cluster. For example, for partition 0 of a topic topic-partition, the BrokerIDS for the AR of that partition are [0, 2, and 1].
The in-Sync Replicas (ISR) of a partition means that there are brokers In the cluster that are In Sync with the leader Replicas of the partition, and ledaer nodes are also In the ISR.
We can view topic information in zK’s /brokers/ Topics directory by using the zK client command:
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics
[__consumer_offsets, test, topic-partition]
Copy the code
Macro figure
Topic_partition = topic_partition = topic_partition = topic_partition = topic_partition
The producer produces and sends messages
Production of the message
A production message is a production ProducerRecord object:
public class ProducerRecord<K, V> { private final String topic; Private final Integer partition; Private final Headers Headers; // Message header private final K key; // Message key private final V value; Private final Long timestamp; // Message timestamp // omit constructor}Copy the code
The ProducerRecord object is stored in the JVM process space in memory, and the object needs to be serialized and encoded into a specific byte sequence during network transmission, so the message needs to be serialized first for the next step of sending;
Serialization goes in three general directions:
- Language-specific formats such as
Java
theSerializable
And other built-in serialization methods; - Text format, such as
XML
,JSON
; - Binary encoding format, such as
Thrift
,Protocol Buffers
;
Send a message
Determine the broker to which the message is sent
The kafka-console-producer.bat script connects to the broker and delivers messages:
PS D:\kafka_2.12-2.3.0-1> bin/ Windows /kafka-console-producer.bat --broker-list localhost:9092 -- topic-partition >`Hello, Nice to meet you`Copy the code
Therefore, producers only need to know the specified topic and broker-list(bootstrap.server) to send a message.
But we know that the topic-partition topic has three partitions, and that the Leader copy of each partition is on a different broker node; In addition, we can add a Key to the message to specify the partition to be delivered, and the Leader copy of the partition does not necessarily exist on the nodes connected by specifying broker-list.
So there are two solutions:
- by
broker-list
Corresponding broker processing, which is responsible for the delivery of messages to corresponding Broker nodes; - The producer client requests the metadata of the kafka cluster corresponding to the topic (
Metadata
), the producer client then delivers the message to the broker node where the Leader copy of the corresponding partition resides through metadata – (metadata includes the number of partitions of the topic, the number of partition copies, and the broker node where the Leader copy of the target partition residesIp:Port
);
Kafka chooses the second option, where the client bears most of the burden of obtaining the specific broker.
The client obtains the broker corresponding to the Leader copy of the partition by sending MetadataRequest request metadata to the broker node known to the client with the lowest load, and then fills ProducerRecord with the following ProducerRequest format and sends it to the corresponding broker. The blue box represents the subject, partition, and content of the message record_set(ProducerRecord). Other parameters are described below.
PS: Kafka defines different Request protocols for different scenarios, such as MetadataRequest for metadata, ProducerRequest for producers to deliver messages;
Pool message sending
In the previous section, we parsed a producer message and sent it to the broker by finding the corresponding broker address and wrapping it as a Request object.
However, in the same producer client, there may be multiple threads producing messages to each topic and each partition at the same time. In this case, we can have two solutions:
- After the corresponding message is produced in each thread, it is sent by itself;
- Abstracting the problem as a producer-consumer problem, organizing the code through collaboration between threads;
The first scheme has good timeliness, but it wastes resources. It sends directly every time regardless of the size of the data volume, and cannot effectively reuse the connection to transmit more data packets. And because the two steps of production and sending messages are coupled together, it is not conducive to the scalability of the program;
The second scheme can make better use of network resources. Multiple production threads put the messages sent to their respective topics and partitions in different queues. When the messages reach a certain level, they are sent to the sending threads together, which can reduce the consumption caused by establishing connections.
Kakfa chose the second option, where the Produce thread produces each message as ProducerRecord, and we need to classify the message by partition before putting it into the message accumulator pool (partition calculated by partition). In this way, the Send thread can Send messages to the same partition in a connection with the broker, reducing the cost of frequent connections.
After partition calculation, the produce thread puts messages into the Deque of the corresponding partition of message accumulator. ProducerBatch is a message set of fixed size composed of multiple messages. ProducerBatch is also the basic unit of message sending, and its size can be controlled by batch.size. If the size of a ProducerRecord exceeds this value, the ProducerBatch size corresponding to the message is the size of the message:
The sending thread then obtains messages from the message accumulator for different topics and partitions, and obtains the ProducerBatch from the broker to which the message should be sent based on the ProducerBatch partition and kafka cluster metadata. Encapsulate ProducerBatch as the corresponding ProducerRequest, put it into a queue divided according to different destination brokers, and send it to the selectors in turn:
The sending thread has three options: send and forget; Asynchronous callback after sending; Synchronous observation status after sending;
In the latter two cases, sending threads maintain queues of InFlightRequests that are sent but not confirmed at the broker level. Queues that exceed this size cannot be extended to corresponding Broker nodes. We can use Max. In. Flight. Requests. Per. The connection parameters to control the size of the queue;
In fact, this mechanism is similar to the simplified version of Tcp sliding window, in which message accumulator messages are unconfirmed messages, and inFlight queues are sent and unconfirmed messages. If the messages are sent and confirmed, they do not need to be stored.
After a certain number of unconfirmed messages are sent, they cannot be sent again, which is similar to a congestion control mechanism without reducing the transfer window.
In fact, this indirectly represents the control of network transmission, generally divided into reliability and congestion (speed) two aspects to consider;
However, because it simplifies the sliding window, for packets that fail to be sent (no response due to timeout), it cannot be similar to continuous ARQ or SACK (fast retransmission) mechanism to ensure the sequence of packets, because it does not have the concept of sending window, as long as the successful sending will move out of the inFlight queue. So it only can by Max. In. Flight. Requests. Per. The connection parameters set to 1 to implement the stop waiting agreement – ARQ retransmission, a message is only sent to the node in a message back out of the queue, an ACK is received send thread can continue to send, in order to guarantee the order of the package;
Even if sliding Windows are fully implemented, retransmission of continuous ARQs creates a problem of repeated consumption because the send() method for sending messages is not idempotent (that is, if a message is retransmitted, the broker does not overwrite the original message).
Broker receiving messages:
Confirm when to reply to the client
When a ProducerRequest request message is sent to the leader copy in the broker corresponding to the specified partition, the broker determines the timing of the response to the client based on the acks value in the request message.
- Acks =1, that is, once the leader copy has written the message to the log, the broker can respond to the client indicating that the message has been processed successfully.
- Acks =0, the broker node receives the message and returns to the client to indicate that the message has been processed successfully.
- Acks =-1, only when all
ISR(In-Sync Replicas)
After both copies have successfully written the message to the log,broker
Will respond to the client.
The acks parameter defines the position of the broker’s response to the message in the process of the message: The log is written to the leader log, and after the follower copy in the ISR set FetchRequest to pull the message and the follower copy in the ISR set is synchronized, the log is identified as Commited.
Before you understand the acks parameter, you need to introduce the primary/secondary mechanism of the Kafka partition.
Partition primary/secondary mechanism
Leader node crashes and elections
Leader Node election method
In Kafka, primary and secondary copies exist for each partition. The read and write policies are primary and write primary. Secondary nodes are used only for DISASTER recovery.
The leader copy is the first node in the ISR set in the AR set. For example, the AR and ISR sets are as follows:
AR: [1, 3, 4]; ISR: [3, 4]; Traversal AR: 1 is not in the ISR set and is therefore passed; 3 in the ISR set, therefore elected as the leader node;
At the beginning of the partition, all AR nodes of the partition are ISR nodes, so the leader copy is the priority copy of the partition (the first copy in the AR set), and the leader node is the first node in the AR set. This is how a new Leader replica is determined from the ISR collection after the current Leader replica node crashes;
After the leader node crashes, a new leader is elected
Kafka faces two key problems with a read-write master policy:
- The leader nodeApplication state machine late reply clienttrigger
Read After Write
The problem (kafka
inacks=1/-1
, the leader node will write the message log first and then reply to the client, so this problem does not exist); - Split brain problem, that is, two leader nodes are generated in the cluster because the network partition or the original leader node is down and restarts, and both of them can provide services externally, which causes data inconsistency in the cluster.
Therefore, Kafka needs to provide mechanisms to solve the split brain problem. The core is to ensure that there is only one effective leader node in the cluster to provide services externally. For this distributed consensus problem, Kafka relies on zooKeeper, a third-party component that implements the consensus algorithm -ZAB. Kafka adds a /brokers/topics/{topicname}/partitios/{partitionName}/state node to zK that contains the status of the current partition’s primary node:
The next core problem is: after the leader node crashes, how to sense the leader node collapse, and who will trigger the next round of leader node election? Unlike the traditional distributed consensus algorithm, kafka does not rely on the followers to sense the heartbeat timeout mechanism. The followers trigger the election of the next epoch, and the follower node that has obtained more than half of the support starts a new term as the leader.
In contrast, Kafka introduces the concept of a controller on the server side. The controller manages the state of all partitions and replicas in the cluster, including sensing the collapse of the leader replicas and determining the next leader replicas. At cluster startup, each broker attempts to read the /controller node in the ZK. If the /controller node is not present in the ZK or the data in the node is abnormal, it attempts to create a temporary /controller session node in the ZK.
- The successfully created broker node becomes the controller and the persistent node in zK
/controller_epoch
In the storecontroller_epoch
Value, which corresponds to the change times of the controller node, each change of the controller node, the value will increase 1; - Nodes that fail to be created are registered with ZK
/controller
A node’s Watcher listens so that when the original controller node crashes, other brokers are notified and a new controller election is triggered.
Each broker node, including the controller node, stores the brokerId value for the current controller in its own memory.
The controller selects the next leader replica from the ISR replica
- The controller will be in ZK
/brokers/ids
Register Watcher listeners in the directory to handle the event if a broker goes down. - Determined by the controller
set_p
This collection contains all partitions on all brokers that went down. - from
/brokers/topics/{topicname}/partitios/{partitionname}/state
Read the Partition’s current ISR to determine whether the Partition copy in the broken broker is the leader copy. If it is the leader copy, the next follower copy in the AR order in the ISR must be the leader copy of the partition. - New leader copy, ISR, new
leader_epoch
Corresponding to the controllercontroller_epoch
write/brokers/topics/{topicname}/partitios/{partitionname}/state
; - Through the Rpc
set_p
Associated Broker to sendLeaderAndISRRequest
Command to inform the leader and Isr transformation;
At this point, the leader switchover of kafka partition is complete.
For more information on data consistency in a master-slave architecture, please refer to this article on data consistency in a distributed master-slave architecture.
From the above discussion, we can know that maintaining ISR set is the guarantee to ensure that data is not lost when leader switches. Therefore, the methods of managing ISR set are described below.
Followers join and move out of the ISR collection
Move out of ISR collection
However, the slave node will be removed from the ISR list if the following two conditions exist:
- Invalid: The slave node where the follower copy of the partition resides is down and cannot be contacted
leader
Nodes remain connected; - Synchronization failure: the follower copy of the partition lags behind the leader copy by a period of time
replica.lag.time.max.ms
Specify, default is 10000), where the lag refers to the follower copyLEO
Not catching up with the leader replicaLEO(Log End Offset)
, that is, in the specified period of time has been behind the state;
As shown in the following figure, all three nodes are in the ISR set of the partition: The partition Leader has stored five logs, and the corresponding LEO is 6 (that is, it identifies the location where the next message is written to the log file). Four and three logs are stored in the two followers, and The LeOs are 5 and 4 respectively.
HW is the smallest LeO-1 in the ISR copy set. In the figure above, it is 3. HW represents the committed message shift in the partition;
Join the ISR set
If the slave node of the removed ISR from the AR set of the partition wants to join the ISR again, the following conditions must be met: The LEO value of the follower partition copy catches up with the HW of the current ISR set. Note that catching up with leader node progress here and moving out of ISR set due to synchronization failure above are not the judgment criteria; Because if the sub-area does not catch up with LEO in replica.lag.time.max.ms, it will be removed from the ISR even if it returns to the ISR set.
The purpose of this is to ensure that the partition has a certain number of ISR nodes: If over a period of time the producer ProducerRequest QPS suddenly exploded, lead to all the nodes are removed from the ISR, the collapse if the leader node, and unclean. Leader. Election. Enable the default value is false, If a leader cannot be elected from a non-ISR node, the zone has no leader temporarily.
At this point, the kafka Broker side of the master/slave election part is finished, and the next step is to write the log:
Log maintenance
Log Writing (Application state machine)
Unlike RabbitMQ, where logs are stored in memory, kafka stores logs on disk. For example, three partitions of a topic-partition are stored in the log.dirs directory:
Write logging means appending messages to the end of the corresponding partition file. Sequential disk writes are even more efficient than random memory writes on some SSDS.
Kafka’s use of disks is based on the following considerations:
- Disk is more stable than memory;
- The operating system has many optimization algorithms for disk writing (such as preread, asynchronous flush, etc.). Based on the guarantee of the operating system, Kafka only needs to call the operating system interface for file writing, and does not need to care about how to deal with the loss of written information.
However, when consumers read the corresponding partition file, they need to scan the file on disk, and the efficiency of disk scanning is very low. Add index
to the file. For example, if the consumer wants to read the 6th record, read index recordIndex == 6 to obtain the corresponding position in the file.
As we all know, Kafka throughput is relatively high, and if the use of clustered index to organize the index, is bound to cause the index space explosion; Therefore, Kafka uses a sparse index by inserting several pieces of data once.
Because the read/write efficiency of a large file is slow – all requests to the partition correspond to the file, and if the number of current log entries exceeds integer.max_value (2 ^ 32), the recordIndex will fail to record. So Kafka splits log files into segments based on log.segment.bytes.
Because there is a one-to-one relationship between index and file, multiple log files will lead to multiple indexes. As a result, if the log offset that consumers want to find is later, they need to traverse multiple index files to get the corresponding index record. Therefore, we need to increase the index-hop table of the index to maintain the maximum value of each index, so that we can find the log file corresponding to our offset faster according to the upper index.
This is how Kafka writes logs and indexes. However, disk space is limited, so Kafka needs to provide compact and delete functions to control the size of log and index files.
Maintain indexes and logs
Log compression (compact
)
Kafka logs are stored based on key-value, which is similar to Redis;
In Redis, there are two logging modes: RDB and AOF. In Kafka, Compact is similar to RDB. Only the latest Kafka message with the same Key is kept in the log file. Kafka iterates through the compressible log file twice, first using a map to store the hash of the message’s
, and then using the map to clean up for the same key whose offset is less than the map’s offset.
Kafka is more complex than Redis in that it only needs to save the latest value of the key to read and write. Kafka’s consumer can consume at offset. If there is a message with the same key between the offset -> LEO and the message has been compressed, The current offset message will fail to be consumed, so we need an indication to tell us whether the current offset has been cleaned, as shown in the cleanCheckPoint pointer below:
If the current consumption log displacement is to the right of the pointer, it indicates that the offset has not been cleared and can be directly pulled. If it is to the left of the pointer, the offset may have been compact;
Because log files can be compressed to create multiple small files, Kafka uses group compression, a group containing multiple log segments, and a new file with the suffix. Clean to store compressed logs. If the number of compressed logs in the group is smaller than log.segment.bytes, only one log segment file exists after compression.
The timing of log compression is generally based on the log file’s dirt rate, i.e. (FirSTUNableOffset – cleanerCheckPoint)/cleanerCheckPoint, When the value is more than the log. The cleaner. Min. Cleanable. Thewire triggered when log compression;
Log delete
Logs can be deleted periodically based on time, log size, and log start offset.
- Time-based: If the difference between the timestamp in the message and the current timestamp is greater than
log.rentention.hour
Will be deleted; - Log size based, that is, log exceeds
log.rentention.bytes
Will delete the specified size of the file, generally the value is -1. - Log based starting offset, that is, modification
LogStartOffset
Pointer. Logs before the log offset corresponding to the pointer will be deleted and can passkafka-delete-records.sh
Files for manual setting;
But the core is how Kafka deletes a log; In Mysql, when we use the delete command to delete a data row, in order to ensure the isolation level of repetition, Mysql will mark it as a tombstone record. After all active transactions end referencing the data row, the tombstone record will be cleaned by the scheduled thread.
Similar to mysql, Kafka identifies a message as a tombstone record by setting its value to NULL.
Consumer consumption message phase
Consumer consumption is generally divided into several stages:
- Open the consumer client, define their own consumer groups, subscription topics;
- through
FetchRequest
Request to pull partitioned messages for business processing (The poll () method
); - Submit consumption shift;
- Close the consumer client.
The following two points of synchronization of partition policy and submission of consumption shift in several stages will be emphatically described.
Consumer initialization
PS D:\kafka_2.12-2.3.0> bin/ Windows /kafka-console-consumer.bat --bootstrap-server localhost:9092 -- topic-partition `Hello, Nice to meet you`Copy the code
The first thing to be clear is that Kafka consumers get messages by push or pull. The delivery mode of messages is P2P or publish/subscribe (one-to-many broadcast).
Kafka consumers actively request the broker to pull data.
Kafka supports both P2P and publish/subscribe due to the addition of the consumer group concept:
- When there is only one consumer in a consumer group, then all messages in the topic can be consumed by that consumer, i.e. P2P;
- When there are multiple consumers in a consumer group, all messages in the topic will be consumed by the consumers in the consumer group according to the corresponding partitioning principle, namely publish/subscribe.
However, regardless of the message delivery mode, consumers must first define their own consumer group and subscribe to the corresponding topic. Once the definition is complete, you can start asking the broker to pull data.
Consumer Zoning Principles
Unlike some other message-oriented middleware, there is also the concept of consumer groups in Kafka. Each consumer belongs to a consumer group, and messages for each topic are delivered to only one consumer of each consumer group that subscribes to it.
For example, there are three subdivisions of topic-partition, including consumer group A (composed of two consumers C1 and C2), consumer group B (composed of three consumers C1, C2 and C3) and consumer group C (composed of four consumer groups C1, C2, C3 and C4).
- For consumers in group A, C1 is assigned two partitions and C2 is assigned one partition;
- For consumers in group B, each consumer is assigned a partition;
- For the consumers in group C, C1, C2 and C3 are allocated to a zone, while C4 has no zone consumption.
RangeAssignor is the default partition strategy in Kafka. It assigns each consumer in a consumer group the number of partitions divided by the number of consumers in the consumer group, and the remaining partition number % consumer group the number of consumers in the first dictionary order (C1 before C2).
All partitioning strategies aim to evenly distribute partitions among consumer groups.
Consumers start spending
Before starting to request messages for a topic, the consumer needs to answer this question: How does the consumer know which partitions of the topic they should pull? How do partitioning policies synchronize across multiple consumers?
Synchronization of partition policies
This involves multiple nodes synchronization problem we can through the kafka server after confirm partition strategy in the zookeeper add partition strategy information relevant to coordinate (corresponding to the following figure/consumers/group1 / market metrix/topic1 node) :
As shown in the figure above, for zone 1 of topic1 theme in Group1, consumers can get their own zone node by traversing topic1 node for the first time.
When consumers in the consumer group change, that is, the number of temporary nodes in the /consumers/group/ IDS directory of ZK changes, partition rebalancing needs to be triggered to reallocation the topic partitions among consumers. Therefore, all consumers in the current consumer group need to set Watcher listeners for the /consumers/group/ IDS directory of this group (Watcher listeners generate events to notify listeners when the corresponding nodes change).
But this over-reliance on ZK leads to two problems:
- If the new consumers only subscribe
topic1
, does not subscribe to other topics, so the only thing that needs to be triggered is subscriptionstopic1
, but triggers all consumer nodes in the group. This wide-grained triggering mechanism for large groups causes a large number of Watcher notifications to be sent to the client, resulting in zooKeeper response delays during notifications, i.eHerd behaviour; - Write to ZK because you need to followConsensus algorithmThe operation of performing state machine synchronization, as well as state machine synchronization, is not atomic and therefore may result in multiple consumer nodes reading the zK that stores the topic partitions
owner
The obtained state is not consistent, which eventually leads to the occurrence of abnormal problems.
Therefore, we need a more fine-grained notification mechanism to notify only the nodes that need to be notified. Therefore, we need to introduce a new mechanism, a Coordinator, which is similar to the Coordinator role in 2PC and is used to reach consensus on partitioning policies among multiple distributed nodes.
We’ll cover this next when we submit the consumption shift, because both use the _consumer_offset internal theme.
Commit consumption shift
Before kafka0.9, as shown in the figure above, the /consumers/{groupname}/offsets/{topicname}/{partitionname} directory of zk was used to store the consumption shift of each consumer group for the corresponding topic partition.
As mentioned above, ZK is not suitable for time-sensitive multi-write scenarios, so after version 0.9 Kafka stores the consumption shifts of partitions in different consumer groups through the built-in theme _consumer_offsets.
_consumer_offsets theme of partition of the offsets. Topic. Num. Partitions to configuration, the default is 50 partition;
If group. Id is test_group_id, kafka calculates the partition ID for the topic of the consumer group commit shift _consumer_offsets using the following calculation:
/ / groupMetadataTopicPartitionCount is the offsets. Topic. Num. Partitions Maths. Abs (groupId. HashCode ()) % groupMetadataTopicPartitionCountCopy the code
Therefore, if the hash function is not uniform, part of the partition of the _consumer_offsets theme may aggregate most of the consumption log commits, resulting in some brokers’ disks being written full while others are idle.
GroupCoordinator
For each consumer group, the GroupCoordinator runs in the broker where the leader copy of the _consumer_offsets topic partition used to submit the consumption shift is the GroupCoordinator used to resolve the previous herd effect.
Let’s take the example of adding a new consumer to the consumer group. Because the number of nodes in the consumer group changes, partition redistribution needs to be triggered. Instead of notifying through Watcher, the scheme lets new consumers notify themselves;
1. Find the GroupCoordinator node
First of all, the newly added consumer needs to send FindCoordinatorRequest to the broker in its communication list, which contains its own groupId.
After the broker receives a request by Maths. Abs (groupId. HashCode ()) % groupMetadataTopicPartitionCount formula to calculate the consumer group of corresponding _consumer_offsets topic partition, Zk /brokers/topics/_consumer_offsets/partitions/{partition ID}/state obtains the IP of the broker where the leader copy of the current partition is located and responds. (the broker is the GroupCoordinator)
Make a call to join the consumer group (Rebalance)
After a consumer obtains the corresponding IP address, it needs to send a JoinGroupRequest request to the GroupCoordinator to join the consumer group. The request information includes the array of topics that the consumer wants to subscribe to and the partitioning policy that the consumer supports. A JoinGroupResponse response message containing a specific partition policy is received from the GroupCoordinator.
After the GroupCoordinator receives the request, the Group’s state changes from stable -> preparingRebalance and remains in the preparingRebalance state. In this time interval, when another consumer in the consumer group calls the poll() pull message, the method needs to connect to the GroupCoordinator to obtain the Commited Offsets of the pull partition. At this point, consumers will perceive that the Group they are in is in preparingRebalance and need to trigger the ReJoin mechanism, that is, send JoinGroupRequest to the GroupCoordinator, and the message carries their subscribed topics and supported partitioning strategies.
After the preparingRebalance -> AwaitingSync, the Group state is prepared ingrebalance -> AwaitingSync. After collecting the JoinGroupRequest requests submitted by all consumers in the Group, the server decides what partitioning strategy to use for partitioning. However, the server does not allocate the partitioning of the topic subscribed by the consumer Group at the server. Instead, it randomly selects a consumer from the consumer Group as the Leader. And send JoinGroupResponse to the node. The response contains the topic subscribed by each consumer in the group and a partitioning policy determined.
The node allocates the partition result according to the partition policy and generates the partition result to the GroupCoordinator through SyncGroupRequest. The GroupCoordinator synchronizes the partition result in the consumer group.
During the state flow period, you may wonder why it is necessary to go around such a big bend and allocate the state directly in the GroupCoordinator. This keeps the details of partition allocation away from the broker, and even if the partitioning policy changes later, the consumer side can be restarted without restarting the server side.
After the synchronization, the Group status changes from AwaitingSync -> stable.
At this point, we have completed the complete process of how a topic partition is allocated among consumers and how consumers commit shifts after pulling message consumption.
Conclusion:
This blog post contains too much detail to go into all of them, but even so, the story of a message and related details is fairly clear, but it can take a lot of time to read logically
Reference:
Deep Understanding Kafka, From Paxos to ZooKeeper, Data Intensive Application Design
Kafka High Availability (上)
Kafka Group state change analysis and Rebalance process