0. Introduction

CMAK is the name of the current kafka-Manager project, which provides web control over the various operations of kafka clusters. The purpose of the research is to investigate the information of Kafka-Manager in its local ZooKeeper, as well as operations in the process of operation topic. Making Repo address https://github.com/yahoo/CMAK

1. What is saved in Zookeeper

1.1 Exporting Scripts

Kafka-manager: zooKeeper kafka-manager: zooKeeper kafka-manager: ZooKeeper

from kazoo.client import KazooClient

kafka_cluster = "x.x.x.x:2181"
fileName = "result"


def get_kafka_client() :
    zk = KazooClient(hosts=kafka_cluster)
    zk.start()
    return zk


def get_all_cluster_clusters() :
    zk_client = get_kafka_client()
    path_list = ["/kafka-manager/clusters"]
    result = []
    while len(path_list) > 0:
        next_layer = list(a)for path in path_list:
            children = zk_client.get_children(path)
            # Here is the leaf node
            if len(children) == 0:
                if path.split("/")[-1] = ="topics":
                    print("cluster {0} has no topic.".format(path))

            content = zk_client.get(path)[0]
            result.append([path, content])
            for child in children:
                new_path = path + "/" + child
                next_layer.append(new_path)
        path_list = next_layer
    zk_client.stop()
    zk_client.close()

    cluster_topic_map = dict(a)for item in result:
        split_path = item[0].split("/")

        if len(split_path) < 4:
            continue

        cluster_name = str(split_path[3])

        if cluster_name not in cluster_topic_map:
            cluster_topic_map[cluster_name] = 0

        if len(item[0].split("/")) != 6 or item[0].split("/")[-2] != "topics":
            continue

        cluster_topic_map[cluster_name] += 1

    for key in cluster_topic_map:
        print("{0} has {1} topics".format(key, cluster_topic_map[key]))

    with open("kafka_clusters"."w") as f:
        f.seek(0)
        for item in result:
            if len(item[0].split("/")) != 6 or item[0].split("/")[-2] != "topics":
                continue
            f.write(item[0] + "\n")
            f.write(item[1] + "\n")
Copy the code

Export the path and contents of all ZNodes under/kafka-ManagerNamespace.

1.2 Content Analysis

The /kafka-manager path contains /kafka-manager/configs /kafka-manager/mutex /kafka-manager/deleteClusters/Kafka-Manager/Clusters. The meta information of each Kafka cluster registered by the Kafka-Manager cluster is stored under configS, and the topic information of some Kafka clusters is recorded in Clusters. This section provides an example of the meta information under /kafka-manager/configs/:cluster

{
    "name":"data-ckc1-qcsh4"."curatorConfig": {"zkConnect":"x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181:2181"."zkMaxRetry":100."baseSleepTimeMs":100."maxSleepTimeMs":1000
    },
    "enabled":true."kafkaVersion":"1.1.0"."jmxEnabled":true."jmxUser":null."jmxPass":null."jmxSsl":false."pollConsumers":false."filterConsumers":false."logkafkaEnabled":false."activeOffsetCacheEnabled":false."displaySizeEnabled":false."tuning": {"brokerViewUpdatePeriodSeconds":30."clusterManagerThreadPoolSize":2."clusterManagerThreadPoolQueueSize":100."kafkaCommandThreadPoolSize":2."kafkaCommandThreadPoolQueueSize":100."logkafkaCommandThreadPoolSize":2."logkafkaCommandThreadPoolQueueSize":100."logkafkaUpdatePeriodSeconds":30."partitionOffsetCacheTimeoutSecs":5."brokerViewThreadPoolSize":2."brokerViewThreadPoolQueueSize":1000."offsetCacheThreadPoolSize":2."offsetCacheThreadPoolQueueSize":1000."kafkaAdminClientThreadPoolSize":2."kafkaAdminClientThreadPoolQueueSize":1000."kafkaManagedOffsetMetadataCheckMillis":30000."kafkaManagedOffsetGroupCacheSize":1000000."kafkaManagedOffsetGroupExpireDays":7
    },
    "securityProtocol":"PLAINTEXT"
}
Copy the code

Znodes hosts the full amount of meta information in this path. Kafka-manager uses this meta information to access the managed Kafka cluster. / kafka – manager/clusters under the path/kafka – manager/clusters/cluster/switchable viewer / : topic, storage is the topic of information, but unlike expectations, Kafka-manager uses partial topic information. For existing topics, there are:

{
    "69": [6.1.2]."0": [7.5.6]."5": [2.10.1]}Copy the code

Key corresponds to Parttion, and Value corresponds to Replicas.The preliminary conclusion is that part of the topics information here is used as cache, and the specific use needs further investigation.

1.3 Zookeeper Information Summary

Kafka-manager only stores basic meta-information about kafka clusters and does not store full details such as Partition/Topics.

2 CMAK source code walking

The source code for CMAK is written in Scala using the following frameworks and third-party libraries:

  1. Play Framework: Kafka-Mananger is essentially a Web application, so it is implemented using the MVC architecture of the Play Framework.
  2. AKKA: For building highly concurrent, distributed, and fault tolerant applications. All requests in Kafka Manager are processed asynchronously using Akka;
  3. Apache Curator Framework: used to access ZooKeeper;
  4. Kafka Sdk: used to obtain the last offset of each Topic, using Admin interface to achieve various management functions;

2.0 Intro

The overall business code is located in the APP path. For the framework used by CMAK, the entry of the program is CMAk /conf/routes. Here, due to time reasons, only the operation process of creating topic and finding topic is introduced.

2.1 GET topic

From the route map shows GET/clusters / : c/switchable viewer controllers. The Topic, switchable viewer (c: String) app/controllers/Topic. Scala

  def topics(c: String) = Action.async { implicit request:RequestHeader =>
    kafkaManager.getTopicListExtended(c).map { errorOrTopicList =>
      Ok(views.html.topic.topicList(c,errorOrTopicList)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")}}Copy the code

Which business function is getTopicListExtended, returns an HTML templates views. HTML. Topic. TopicList, template address for app/views/topic/topicList scala. HTML, including business function is app/kafka/manager/KafkaManager.scala

  def getTopicListExtended(clusterName: String) :Future[ApiError/ /TopicListExtended] = {
    val futureTopicIdentities = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetTopicIdentities))(
      identity[Map[String.TopicIdentity]])
    val futureTopicList = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, KSGetTopics))(identity[TopicList])
    val futureTopicToConsumerMap = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetTopicConsumerMap))(
      identity[Map[String.可迭代[(String.ConsumerType)]]])
    val futureTopicsReasgn = getTopicsUnderReassignment(clusterName)
    implicit val ec = apiExecutionContext
    for {
      errOrTi <- futureTopicIdentities
      errOrTl <- futureTopicList
      errOrTCm <- futureTopicToConsumerMap
      errOrRap <- futureTopicsReasgn
    } yield {
      for {
        ti <- errOrTi
        tl <- errOrTl
        tcm <- errOrTCm
        rap <- errOrRap
      } yield {
        TopicListExtended(tl.list.map(t => (t, ti.get(t))).sortBy(_._1), tcm, tl.deleteSet, rap, tl.clusterContext)
      }
    }
  }
Copy the code

KafkaManagerActor gets the cluster’s topic information. Packages of the actual operation is KMClusterQueryRequest here, the specific call for app/kafka/manager/actor/KafkaManagerActor scala

      case KMClusterQueryRequest(clusterName, request) =>
        clusterManagerMap.get(clusterName).fold[Unit] {
          sender ! ActorErrorResponse(s"Unknown cluster : $clusterName")
        } {
          clusterManagerPath:ActorPath =>
            context.actorSelection(clusterManagerPath).forward(request)
        }
Copy the code

And app/kafka/manager/actor/KafkaStateActor scala

      case KSGetTopics= >val deleteSet: Set[String] =
          featureGateFold(KMDeleteTopicFeature) (Set.empty,
            {
              val deleteTopicsData: mutable.Buffer[ChildData] = deleteTopicsPathCache.getCurrentData.asScala
              deleteTopicsData.map { cd =>
                nodeFromPath(cd.getPath)
              }.toSet
            })
        withTopicsTreeCache { cache =>
          cache.getCurrentChildren(ZkUtils.BrokerTopicsPath)
        }.fold {
          sender ! TopicList(IndexedSeq.empty, deleteSet, config.clusterContext)
        } { data: java.util.Map[String.ChildData] =>
          sender ! TopicList(data.asScala.keySet.toIndexedSeq, deleteSet, config.clusterContext)
        }
Copy the code
	val BrokerTopicsPath = "/brokers/topics"
Copy the code

From the code, the topic seems to be fetched by cache+ from the known cluster meta-information and accessing the corresponding cluster, and from the target cluster’s /brokers/topics.

2.2 POST the CREATE topic

Check the URI table

GET    /clusters/:c/createTopic                             controllers.Topic.createTopic(c:String)
POST   /clusters/:c/topics/create                           controllers.Topic.handleCreateTopic(c:String)
Copy the code

Function in the path of app/controllers/Topic. Scala

def handleCreateTopic(clusterName: String) = Action.async { implicit request:Request[AnyContent] =>
    featureGate(KMTopicManagerFeature) {
      defaultCreateForm.bindFromRequest.fold(
        formWithErrors => {
          kafkaManager.getClusterContext(clusterName).map { clusterContext =>
            BadRequest(views.html.topic.createTopic(clusterName, clusterContext.map(c => (formWithErrors, c))))
          }.recover {
            case t =>
              implicit val clusterFeatures = ClusterFeatures.default
              Ok(views.html.common.resultOfCommand(
                views.html.navigation.clusterMenu(clusterName, "Topic"."Create", menus.clusterMenus(clusterName)),
                models.navigation.BreadCrumbs.withNamedViewAndCluster("Topics", clusterName, "Create Topic"), - / /ApiError(s"Unknown error : ${t.getMessage}")),
                "Create Topic".FollowLink("Try again.", routes.Topic.createTopic(clusterName).toString()),
                FollowLink("Try again.", routes.Topic.createTopic(clusterName).toString())
              )).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
          }
        },
        ct => {
          val props = new Properties()
          ct.configs.filter(_.value.isDefined).foreach(c => props.setProperty(c.name, c.value.get))
          kafkaManager.createTopic(clusterName, ct.topic, ct.partitions, ct.replication, props).map { errorOrSuccess =>
            implicit val clusterFeatures = errorOrSuccess.toOption.map(_.clusterFeatures).getOrElse(ClusterFeatures.default)
            Ok(views.html.common.resultOfCommand(
              views.html.navigation.clusterMenu(clusterName, "Topic"."Create", menus.clusterMenus(clusterName)),
              models.navigation.BreadCrumbs.withNamedViewAndCluster("Topics", clusterName, "Create Topic"),
              errorOrSuccess,
              "Create Topic".FollowLink("Go to topic view.", routes.Topic.topic(clusterName, ct.topic).toString()),
              FollowLink("Try again.", routes.Topic.createTopic(clusterName).toString())
            )).withHeaders("X-Frame-Options" -> "SAMEORIGIN")}})}}Copy the code

Implementation of the site for the app/kafka/manager/KafkaManager scala

  def createTopic(
                   clusterName: String,
                   topic: String,
                   partitions: Int,
                   replication: Int,
                   config: Properties = new Properties) :Future[ApiError/ /ClusterContext] =
  {
    implicit val ec = apiExecutionContext
    withKafkaManagerActor(KMClusterCommandRequest(clusterName, CMCreateTopic(topic, partitions, replication, config))) {
      result: Future[CMCommandResult] =>
        result.map(cmr => toDisjunction(cmr.result))
    }
  }
Copy the code

Specific for CMCreateTopic: app/kafka/manager/model/ActorModel scala according to this idea has been down to check:

case class CMCreateTopic(topic: String,
                           partitions: Int,
                           replicationFactor: Int,
                           config: Properties = new Properties) extends CommandRequest
Copy the code

app/kafka/manager/actor/cluster/ClusterMnagerActor.scala

      case CMCreateTopic(topic, partitions, replication, config) =>
        implicit val ec = longRunningExecutionContext
        val eventualTopicDescription = withKafkaStateActor(KSGetTopicDescription(topic))(identity[Option[TopicDescription]])
        val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
        eventualTopicDescription.map { topicDescriptionOption =>
          topicDescriptionOption.fold {
            eventualBrokerList.flatMap {
              bl => withKafkaCommandActor(KCCreateTopic(topic, bl.list.map(_.id).toSet, partitions, replication, config)) {
                kcResponse: KCCommandResult= >CMCommandResult(kcResponse.result)
              }
            }
          } { td =>
            Future.successful(CMCommandResult(Failure(new IllegalArgumentException(s"Topic already exists : $topic"))))
          }
        } pipeTo sender()
Copy the code
  case class KCCreateTopic(topic: String,
                           brokers: Set[Int],
                           partitions: Int,
                           replicationFactor:Int,
                           config: Properties) extends CommandRequest
Copy the code
      case KCCreateTopic(topic, brokers, partitions, replicationFactor, config) =>
        longRunning {
          Future {
            KCCommandResult(Try {
              kafkaCommandActorConfig.adminUtils.createTopic(kafkaCommandActorConfig.curator, brokers, topic, partitions, replicationFactor, config)
            })
          }
        }
Copy the code
  def createTopic(curator: CuratorFramework,
                  brokers: Set[Int],
                  topic: String,
                  partitions: Int,
                  replicationFactor: Int,
                  topicConfig: Properties = new Properties) :Unit = {

    val replicaAssignment = assignReplicasToBrokers(brokers,partitions,replicationFactor)
    createOrUpdateTopicPartitionAssignmentPathInZK(curator, topic, replicaAssignment, topicConfig)
  }
Copy the code

The first step is to calculate the Replica and Broker groups. The second step is to register the new or modified content to the Zookeeper of the target Kafka cluster. The calculation of assignReplicasToBrokers is explained as follows

  /** * There are 2 goals of replica assignment: * 1. Spread the replicas evenly among brokers. * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers. * * To achieve this goal, we: * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list. * 2. Assign the remaining replicas of each partition with an increasing shift. * * Here is an example of assigning * broker-0 broker-1 broker-2 broker-3 broker-4 * p0 p1 p2 p3 p4 (1st replica) * p5 p6 p7 p8 p9 (1st replica) * p4 p0 p1 p2 p3 (2nd replica) * p8 p9 p5 p6 p7 (2nd replica) * p3 p4 p0 p1  p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica) */
Copy the code
def createOrUpdateTopicPartitionAssignmentPathInZK(curator: CuratorFramework, topic: String, partitionReplicaAssignment: Map[Int, Seq[Int]], config: Properties = new Properties, update: Boolean = false, readVersion: Int = -1) { // validate arguments Topic.validate(topic) TopicConfigs.validate(version,config) checkCondition(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, TopicErrors.InconsistentPartitionReplicas) val topicPath = ZkUtils.getTopicPath(topic) if(! update ) { checkCondition(curator.checkExists().forPath(topicPath) == null,TopicErrors.TopicAlreadyExists(topic)) } partitionReplicaAssignment.foreach { case (part,reps) => checkCondition(reps.size == reps.toSet.size, TopicErrors.DuplicateReplicaAssignment(topic,part,reps)) } // write out the config on create, not update, if there is any, this isn't transactional with the partition assignments if(! update) { writeTopicConfig(curator, topic, config) } // create the partition assignment writeTopicPartitionAssignment(curator, topic, partitionReplicaAssignment, update, readVersion) }Copy the code

The operation created is done on the specified ZooKeeper cluster. If it is created for the first time, the initial registration should be done under /config/ Topics /:topic.

3 Conclusion

The CMAK service uses this meta information to link Kafka clusters to its Zookeeper server. The CMAK service uses this meta information to link Kafka clusters to its Zookeeper server. To complete the operation. So if it is hope that took cmak cluster display and cluster topic to add and delete work, only need to sweep over/kafka – mananger/configs / : cluster these znodes yuan inside information

4 TODOFuture jobs

  1. The call link is preliminarily clarified, but the details of the specific implementation are not clear.
  2. Since the use of topics under topics path in ZK of CMAk is observed, what are these data for?
  3. Cmak is slow to query the kafka cluster for the first time after a restart, but fast to query it again. Cmak has its own buffering mechanisms.