One, foreword

In Kafka, a Consumer is a Consumer Group. Producer does not have a Consumer Group. A Consumer is a Consumer Group. Therefore, the Consumer side is more complicated than the Producer side. In addition to the concept of Consumer group, consumers also need to maintain and manage offset and repeated consumption.

The two components related to the consumer group are the ConsumerCoordinator on the consumer client and the GroupCoordinator on the Kafka Broker server. The ConsumerCoordinator is responsible for communicating with the GroupCoordinator. When a Broker is started, a GroupCoordinator instance is started. There are multiple brokers ina cluster. How to determine which Broker Group coordinator a new Consumer interacts with after joining a Consumer Group?

A smart programmer can figure it out. Let’s start with GroupCoordinator.

Second, the GroupCoordinator

Don’t ask. To ask is to have an algorithm and a strategy. Consider the algorithm and strategy that enables a Consumer to correctly locate a GroupCoordinator. This is related to Kafka’s Topic __consumer_offsets.

2.1 __consumer_offsets

The __consumer_offsets internal Topic, which stores Consumer Group consumption cases, has 50 partitions by default, with three copies per partition by default. As shown below:

2.2 How does a Consumer Find a GroupCoordinator?

Each Consumer Group has its own Group coordinator. When a new Consumer wants to find a Group coordinator to interact with it, it needs to hash its GroupId. Then take the number of partitions __consumer_offsets and get the corresponding partition. The broker where the partition leader resides is the node where the Consumer Group interacts with the Group coordinator. Obtain partition formula as follows:

abs(GroupId.hashCode()) % NumPartitions
Copy the code

For example, suppose a GroupId calculates a hashCode of 8, then modulo 50 to get 8. Then the broker where the leader of partition-8 resides is the node we are looking for. The Consumer Group then interacts directly with the Group coordinator on the broker.

3. Group status changes

Before joining the Consumer Group process, Lao Zhou felt it necessary to mention the status change of the Consumer Group.

3.1 the consumer end

In the internal class MemberState of the coordinator AbstractCoordinator, we can see four states of the coordinator: unregistered, no response received after reassignment, response received after reassignment but not yet allocated, and stable state.The transformation of the four states of the above consumer terminal is shown in the figure below:

3.2 the service side

For Kafka server-side groups, there are five states: Empty, PreparingRebalance, CompletingRebalance, Stable, and Dead. Their state transitions are shown below:

Iv. Join the Consumer Group process

Before we talk about how consumers join the Consumer Group, let’s review our previous test case for message consumption.

The core method is the poll() method, which we will briefly mention here, and we will introduce Consumer’s network model of poll in more detail later.Consumer how to join the Consumer Group, we have to see what time with GroupCoordinator interactive communication, it is not difficult to find in the message to prepare the pull request updateAssignmentMetadataIfNeeded () this way.

The processing of consumercoordinators is then centralized in the coordinator.poll() method.

Let’s follow these two methods:

Org. Apache. Kafka. Clients. Consumer. Internals. ConsumerCoordinator# poll (org.apache.kafka.com mon. Utils. The Timer, Boolean) method, It can be divided into the following steps:

  • Check whether the heartbeat thread is running properly. (The heartbeat thread needs to be sent periodically to the GroupCoordinator. After a connection is established, nothing is done before the connection is established.)
  • If no coordinator exists and the coordinator is disconnected, return false to end the pull. If the coordinator is unknown, initialize the ConsumerCoordinator (implemented in ensureCoordinatorReady())
  • Determine if you need to trigger rebalancing, which redistributes partition information in a topic among all consumers within a consumer group.
  • Send the join-group and sync-group requests through ensureActiveGroup() to join the group and obtain the TopicPartition list assigned to it.
  • If metadata needs to be updated and no partition is ready, synchronization blocks until metadata is updated.
  • Commit if the automatic submission consumption progress is enabled and the next submission time has arrived.

A few of these need to be covered in detail, and they are the ensureCoordinatorReady() method, the rejoinNeededOrPending() method, and the ensureActiveGroup() method.

Fifth, ensureCoordinatorReady ()

The function of this method is to select a broker with the smallest number of connections, send a GroupCoordinator request to it, and establish a TCP connection.

  • Method invocation process is: ensureCoordinatorReady () – > lookupCoordinator () – > sendFindCoordinatorRequest ().
  • If a client receives a Server response, it connects to the GroupCoordinator.

5.1 org. Apache. Kafka. Clients. Consumer. Internals. AbstractCoordinator# ensureCoordinatorReady

5.2 org. Apache. Kafka. Clients. Consumer. Internals. AbstractCoordinator# lookupCoordinator

5.3 org. Apache. Kafka. Clients. Consumer. Internals. AbstractCoordinator# sendFindCoordinatorRequest

5.4 summary

  • Select a node with the smallest connection, send a FindCoordinator Request, and process the response.
  • FindCoordinatorRequest this request will be using the group id by ConsumerNetworkClient. The send () to find the corresponding GroupCoordinator node. (of course ConsumerNetworkClient. The send () is used in Java NIO mechanism, we have said to the previous articles)
  • If a GroupCoordinator is correctly obtained (its node ID, host, and port information is returned), a connection is established, and the heartbeat time is updated.

Six, rejoinNeededOrPending ()

Regarding rejoin, the rebalance operation can be triggered in the following situations

  • New consumers join the consumer group (also for the first time)
  • Consumer outage offline (long time without sending heartbeat packets)
  • Consumers actively exit a consumer group, for example by calling the unsubscrible() method to unsubscribe from a topic
  • The GroupCoordinator node corresponding to the consumer group has changed
  • The number of partitions or any of the topics subscribed to within the consumer group has changed

Seven, ensureActiveGroup ()

The GroupCoordinator node is now known and a connection has been established. The ensureActiveGroup() method is used to send join-group and sync-group requests to the GroupCoordinator to obtain the TopicPartition list assigned.

  • Method invocation process is: ensureActiveGroup () – > ensureCoordinatorReady () – > startHeartbeatThreadIfNeeded () – > joinGroupIfNeeded ()
  • The most important method in the joinGroupIfNeeded() method is initiateJoinGroup(), It is the call flow sendJoinGroupRequest () – > JoinGroupResponseHandler. The handle () – > onJoinLeader (), onJoinFollower () – > sendSyncGroupRequest()
/** * Make sure the Group is active and join the Group. * Ensure the group is active (i.e., joined and synced) * *@param timer Timer bounding how long this method can block
 * @throws KafkaException if the callback throws exception
 * @return true iff the group is active
 */
boolean ensureActiveGroup(final Timer timer) {
    // always ensure that the coordinator is ready because we may have been disconnected
    // when sending heartbeats and does not necessarily require us to rejoin the group.
    // Ensure that the GroupCoordinator has been connected
    if(! ensureCoordinatorReady(timer)) {return false;
    }

    // Start the heartbeat sending thread (the heartbeat will not be sent until the condition is met)
    startHeartbeatThreadIfNeeded();
    // Send a JoinGroup request and process the returned information.
    return joinGroupIfNeeded(timer);
}
Copy the code

7.1 joinGroupIfNeeded ()

The join-group request is implemented in joinGroupIfNeeded().

7.2 initiateJoinGroup ()

The most important method in the joinGroupIfNeeded() method is initiateJoinGroup().

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup(a) {
    // we store the join future in case we are woken up by the user after beginning the
    // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
    // to rejoin before the pending rebalance has completed.
    if (joinFuture == null) {
        // State is marked as rebalance
        state = MemberState.PREPARING_REBALANCE;
        // a rebalance can be triggered consecutively if the previous one failed,
        // in this case we would not update the start time.
        if (lastRebalanceStartMs == -1L)
            lastRebalanceStartMs = time.milliseconds();
        // Send a JoinGroup request
        joinFuture = sendJoinGroupRequest();
        joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
            @Override
            public void onSuccess(ByteBuffer value) {
                // do nothing since all the handler logic are in SyncGroupResponseHandler already
            }

            @Override
            public void onFailure(RuntimeException e) {
                // we handle failures below after the request finishes. if the join completes
                // after having been woken up, the exception is ignored and we will rejoin;
                // this can be triggered when either join or sync request failed
                synchronized (AbstractCoordinator.this) { sensors.failedRebalanceSensor.record(); }}}); }return joinFuture;
}
Copy the code

7.3 sendJoinGroupRequest() : join-group request

Continue with the sendJoinGroupRequest() method

SendJoinGroupRequest (1) : sends a join-group request to the GroupCoordinator

  • If group is the new group.id, then the group initialization state isEmpty.
  • After a GroupCoordinator receives a consumer’s join-group request, the group member list is empty (the group is newly created, Each consumer instance is called a member of the group, and the first member to join the group is chosen as the leader. When the first consumer instance joins, it will be elected leader.
  • If the GroupCoordinator receives a join group request from the leader, it triggers the rebalance and changes the group state toPreparingRebalance.
  • The GroupCoordinator waits for a certain period of time. If the consumer that receives the join group request is considered alive within a certain period of time, the Group changes toAwaitSyncThe GroupCoordinator returns its response to all members of the group.
  • After a consumer receives a response from a Group coordinator, if the consumer is the group leader, The consumer will be responsible for subscribing to the entire group (default is range, currently RoundRobin is also available). The leader then sends the allocated information to the GroupCoordinator as a sendSyncGroupRequest() request, and the Consumer instance, which is the follower, sends an empty list.
  • After receiving a request from the leader, the GroupCoordinator returns the assign result to all consumer instances that have sent sync-group requests, and the group status changes toStableIf a sync-group request is received, the group is in Stable state, and its allocation result is directly returned.

7.4 sendSyncGroupRequest() : sync-group request

The core code of sync-group sending request is as follows:

7.5 onJoinComplete ()

After the previous steps, a consumer instance has successfully joined the group. After joining the group, the ConsumerCoordinator’s onJoinComplete() method is triggered. Update the subscribed TP list and its corresponding metadata.

At this point, a consumer instance is truly successful in joining the group. The consumer then goes to work, and the consumer maintains its affiliation with the consumer and its ownership of the partition by sending a heartbeat to the GroupCoordinator. As long as the heartbeat is sent at normal intervals, it is considered active, but if the GroupCoordinator does not respond, it sends a LeaveGroup request to exit the consumer group.