Coordinator

  • A Coordinator in Kafka is a Consumer Group that implements Group Rebalance, shift management, and Group member management.
  • When a Consumer application commits a shift, it actually commits a shift to the Broker where the Coordinator resides. Similarly, when a Consumer application is started, it sends requests to the Broker where a Coordinator resides. The Coordinator then performs metadata management operations such as registration of Consumer groups and member management records.

Coordinator

  • Coordinator components are created and started when all brokers are started. That is, all brokers have their own Coordinator components. How does a Consumer Group determine which Broker its Coordinator is serving
  • Different group ids are hashed to different partitions so that different brokers can act as coordinators of different groups

Coordinator identification and zone allocation

In a group, a parition can only be consumed by one consumer. In fact, we know that there should be such a component to be responsible for partition allocation. In addition, three allocation strategies of overzone were mentioned in the previous study of consumer group mechanism.

For each consumer group, the Kafka cluster selects a broker from the broker cluster as its coordinator. Therefore, step 1 is to find the coordinator. So one consumer group corresponds to one Coordinattor

Below we have a group with 3 consumers: C0, C1 and C2

GroupCoordinatorRequest request
  • Step 1: determine the displacement of the theme which partition to save the Group data: partitionId = math.h abs (groupId. HashCode () % offsetsTopicPartitionCount).
  • Step 2: Find the Broker where the Leader copy of the partition resides, which is the Coordinator.

The process is to send a GroupCoordinatorRequest to locate a coordinator

First, Kafka evaluates the hash of the Group's group.id parameter. For example, if you have a Group whose group.id is set to "test-group", its hashCode value should be 627841412. Second, Kafka computs the number of partitions __consumer_offsets, usually 50, and then modulates the number of partitions to the absolute value of abs(627841412% 50) = 12. At this point, we know that partition 12 of the shift topic is responsible for holding the Group's data. With the partition number, step 2 of the algorithm becomes easy, we just need to find out which Broker the Leader copy of the shift topic partition 12 is on. The Broker is the Coordinator we are looking forCopy the code
JoinGroup

After all consumers send a JoinGroup message to a Coordinator, the Coordinator designates one of the consumers as the leader and sends the group membership and subscription information to the leader

The other consumers act as followers and are then allocated to partitions by the leader

SyncGroup

The leader starts to assign consumption scenarios, i.e. which consumer is responsible for consuming which partitions of which topic

Once allocated, the leader encapsulates the solution ina SyncGroup request to the Coordinator, who returns null to it

The follower sends a null SyncGroupRequest to the Coordinator, who returns the partition assignment result.

Definite meaning

  • Consumer applications, especially the Java Consumer API, can automatically find and connect to the right Coordinator, so we don’t have to worry about that. The most important thing to know about this algorithm is that it can help us solve localization problems. When a problem occurs in the Consumer Group and the Broker logs need to be quickly checked, the algorithm can accurately locate the Broker corresponding to the Coordinator without blind check from Broker to Broker.

  • For example, if the offset fails to be submitted, the node where the Coordinator broker resides may be faulty. In this case, the algorithm can be used to quickly locate the Coordinator and view the log.

  • If the broker rebalance too frequently, the Coordinator may have a log like “(Re)join group”

Source code parsing Coordinator

The following diagram shows the entire kafka source code structure, and we’ve looked at clients previously

Clients directory: Stores Kafka client code, such as producer and consumer code.

The config directory: stores Kafka configuration files. The most important configuration file is server.properties.

Connect directory: Holds the source code of the CONNECT component. As I mentioned in the opening paragraph, the Kafka Connect component is used for real-time data transfer between Kafka and external systems.

Core directory: Holds broker-side code All the Kafka server-side code is stored in this directory.

Streams directory: Holds the source code for streams components. Kafka Streams is a component that implements Kafka real-time stream processing.

Now let’s look at the code in the core directory. Here we can see almost all the code on the Kafka server side. The one we are familiar with is controller

Coordinators can be divided into two parts according to their functions. The first part is a group that serves our consumer group, and the second part is a transaction that serves distributed transactions.

We can see that there are few classes under the group package.

Let’s take a look at the request flow. As we learned earlier in idempotency production, when a request is made by a client, the entry point for processing it on the server is in the KafkaApis class, and when a client makes a request, KafkaApis have different methods for processing that request.

That’s what we’re going to look at today.

handleFindCoordinatorRequest

One thing to note about the code is that it determines whether the request is transactional or consumer group

Behind is as good as we mentioned earlier (groupCoordinator. PartitionFor (findCoordinatorRequest. Data. Key), GROUP_METADATA_TOPIC_NAME = GROUP_METADATA_TOPIC_NAME = GROUP_METADATA_TOPIC_NAME

Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

Once you have found the partition, start looking for the partition’s lead partition, which is the second code in the figure below. Our Coordinator node is now located

handleJoinGroupRequest

The following code is relatively simple, in fact is the process of a consumer to register, call the groupCoordinator. HandleJoinGroup () method, the consumption of our consumer to join our group

At the same time, the leader consumer that identifies the Consumer group is returned to the client, which is our sendResponseCallback method.

handleSyncGroupRequest

The following is the allocation information of the comSumer synchronization partition, which is returned to the client assignmentMap.

handleOffsetCommitRequest

This is how to handle the consumer client’s submission of offsets. This code is a bit long, so I’ll cover a few important points here

Here we get the offset information we submitted

According to our request, we can determine where offset exists, and the old version is stored in ZK

Finally, the handleCommitOffsets method of the GroupCoordinator is called to submit the offset

conclusion

  1. Coordinator’s role in kafka’s consumer infrastructure is Group Rebalance and provides shift management and Group membership management
  2. How does a Coordinator interact with a client to perform its functions