Star /fork: github.com/Wasabi1234/…

Kafka uses ZK for similar functionality to RocketMQ’s NameServer.

What does Kafka’s ZK do?

Let’s take a look at what Kafka saves in ZooKeeper:

  • In the case of the previous version of 0.8.x, the latest version of Kafka has replaced some of the functions that used to rely on ZooKeeper, such as consumption location management, with other implementations.

The green boxes are temporary nodes and the others are persistent nodes.

1.1 IDS Subtree (Temporary Node)

Kafka brokers/ids/[0… N], each temporary node corresponds to an online Broker. When the Broker is started, a temporary node is created to represent that the Broker is clustered and ready to serve. The node name is BrokerID, and the node holds Broker address, version number, and startup time. This temporary node also disappears if the Broker goes down or is disconnected from the ZK cluster.

1.2 switchable viewer

topic

Save topic and partition information. Each child node under the /brokers/topics/ node is a topic, and the name of the node is the topic name.

partitions

Each topic node contains a fixed partitions node. The children of the Pattitions node are all partitions under the topic, and the node names are the partition numbers.

State (temporary node)

Located under each partition node, it holds the current leader of the partition and the Brokerids of all ISRS. The state temporary node is created by the current Leader Broker for the partition. If the Leader Broker of the partition goes down, the corresponding state node also disappears until a new Leader is elected and the state node is created again.

This metadata is cached for each Broker simultaneously. Kafka primarily uses ZK to store its metadata, monitor the health of brokers and partitions, and use ZK for elections.

2 How does Kafka Client locate brokers?

Find the state temporary node for the partition in the Topics tree based on topic and queue. Extract the Leader BrokerID from it, and then go to the IDS subtree to find the BrokerID’s temporary node to retrieve the Broker’s actual physical address.

Kafka client does not connect directly to ZK, but pulls metadata from the topic of interest from the Broker through RPC requests when needed and stores it in the client’s metadata cache for production and consumption by the client. Metadata on zK is transferred to Kafka clients through the Broker.

The real communication between Kafka client and Broker data is implemented in NetworkClient#poll via the following call chain:

In this method, Kafka constructs a metadata update request:

This method creates not a real MetadataRequest to update metadata, but a constructor for constructing MetadataRequest, metadatAreQuest.Builder, until the request is actually sent. Kafka calls builder.buid () to build the MetadataRequest and send it. In fact, not only metadata requests, all requests are handled this way.

Call sendInternalMetadataRequest () method, the request has not been truly sent out, also is still stored in the momentum of the queue, and then waited for asynchronous batch sent to.

The specific data content of the request is encapsulated in the MetadataRequestData class

3 How does Broker handle update metadata requests from clients?

An entry method for the Broker to process all RPC requests

KafkaApis#handleTopicMetadataRequest

  • Methods for handling update metadata

HandleTopicMetadataRequest (RequestChannel. Request) :

 def handleTopicMetadataRequest(request: RequestChannel.Request) :Unit= {...// Which topic metadata needs to be retrieved
    val topics = if (metadataRequest.isAllTopics)
      metadataCache.getAllTopics()
    // The existence of topics that are not authorized by Describe is not disclosed, so they are not even checked to see if they exist
    val unauthorizedForDescribeTopicMetadata =
      // For all topics, do not include unauthorized topics
      // In older versions of the protocol, metadata for all topics was retrieved each time
      if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
        Set.empty[MetadataResponseTopic]
      else
        unauthorizedForDescribeTopics.map(topic =>
          metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList()))
        // Filter metadata for related topics from the metadata cache
        getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
          errorUnavailableEndpoints, errorUnavailableListeners)

    var clusterAuthorizedOperations = Int.MinValue
    if (request.header.apiVersion >= 8) {
      // Obtain cluster authorization
      if (metadataRequest.data.includeClusterAuthorizedOperations) {
      	...
      // Get the topic authorization action
      if (metadataRequest.data.includeTopicAuthorizedOperations) {
      	...
    val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
    // Get a list of brokers
    val brokers = metadataCache.getAliveBrokers
    // Build Response and send it
    sendResponseMaybeThrottle(request, requestThrottleMs =>
    ...
  }
Copy the code
  1. Start with the list of topics in the request
  2. The local MetadataCache MetadataCache filters out the metadata for the topic that is a subset of the topics subtree
  3. It then goes to the local metadata cache to retrieve the set of all brokers, the IDS subtree
  4. Finally, the two parts are combined and returned to the client as a response.

Kafka maintains a metadata cache in each Broker as it does in ZK. It does not read the ZK every time a client requests metadata. Thanks to ZK’s Watcher mechanism, Kafka senses metadata changes in ZK and updates the Broker’s metadata cache in a timely manner.

4 Best Practices

The availability of Kafka clusters is currently highly coupled to ZK. If zK clusters cannot provide services, the entire Kafka cluster becomes unserviceable. The developers of Kafka are aware of this problem and are currently discussing the development of a metadata service to replace ZK.

If you want to deploy a large-scale Kafka cluster, you are advised to divide it into multiple independent small clusters. Each small cluster uses an independent set of ZKS to provide services. Thus, relatively little data is stored in each ZK, and if a ZK cluster fails, only one small Kafka cluster is affected, minimizing the impact.

reference

  • www.bcoder.top/2019/12/14/…