0. Introduction
CMAK is the name of the current kafka-Manager project, which provides web control over the various operations of kafka clusters. The purpose of the research is to investigate the information of Kafka-Manager in its local ZooKeeper, as well as operations in the process of operation topic. Making Repo address https://github.com/yahoo/CMAK
1. What is saved in Zookeeper
1.1 Exporting Scripts
Kafka-manager: zooKeeper kafka-manager: zooKeeper kafka-manager: ZooKeeper
from kazoo.client import KazooClient
kafka_cluster = "x.x.x.x:2181"
fileName = "result"
def get_kafka_client() :
zk = KazooClient(hosts=kafka_cluster)
zk.start()
return zk
def get_all_cluster_clusters() :
zk_client = get_kafka_client()
path_list = ["/kafka-manager/clusters"]
result = []
while len(path_list) > 0:
next_layer = list(a)for path in path_list:
children = zk_client.get_children(path)
# Here is the leaf node
if len(children) == 0:
if path.split("/")[-1] = ="topics":
print("cluster {0} has no topic.".format(path))
content = zk_client.get(path)[0]
result.append([path, content])
for child in children:
new_path = path + "/" + child
next_layer.append(new_path)
path_list = next_layer
zk_client.stop()
zk_client.close()
cluster_topic_map = dict(a)for item in result:
split_path = item[0].split("/")
if len(split_path) < 4:
continue
cluster_name = str(split_path[3])
if cluster_name not in cluster_topic_map:
cluster_topic_map[cluster_name] = 0
if len(item[0].split("/")) != 6 or item[0].split("/")[-2] != "topics":
continue
cluster_topic_map[cluster_name] += 1
for key in cluster_topic_map:
print("{0} has {1} topics".format(key, cluster_topic_map[key]))
with open("kafka_clusters"."w") as f:
f.seek(0)
for item in result:
if len(item[0].split("/")) != 6 or item[0].split("/")[-2] != "topics":
continue
f.write(item[0] + "\n")
f.write(item[1] + "\n")
Copy the code
Export the path and contents of all ZNodes under/kafka-ManagerNamespace.
1.2 Content Analysis
The /kafka-manager path contains /kafka-manager/configs /kafka-manager/mutex /kafka-manager/deleteClusters/Kafka-Manager/Clusters. The meta information of each Kafka cluster registered by the Kafka-Manager cluster is stored under configS, and the topic information of some Kafka clusters is recorded in Clusters. This section provides an example of the meta information under /kafka-manager/configs/:cluster
{
"name":"data-ckc1-qcsh4"."curatorConfig": {"zkConnect":"x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181:2181"."zkMaxRetry":100."baseSleepTimeMs":100."maxSleepTimeMs":1000
},
"enabled":true."kafkaVersion":"1.1.0"."jmxEnabled":true."jmxUser":null."jmxPass":null."jmxSsl":false."pollConsumers":false."filterConsumers":false."logkafkaEnabled":false."activeOffsetCacheEnabled":false."displaySizeEnabled":false."tuning": {"brokerViewUpdatePeriodSeconds":30."clusterManagerThreadPoolSize":2."clusterManagerThreadPoolQueueSize":100."kafkaCommandThreadPoolSize":2."kafkaCommandThreadPoolQueueSize":100."logkafkaCommandThreadPoolSize":2."logkafkaCommandThreadPoolQueueSize":100."logkafkaUpdatePeriodSeconds":30."partitionOffsetCacheTimeoutSecs":5."brokerViewThreadPoolSize":2."brokerViewThreadPoolQueueSize":1000."offsetCacheThreadPoolSize":2."offsetCacheThreadPoolQueueSize":1000."kafkaAdminClientThreadPoolSize":2."kafkaAdminClientThreadPoolQueueSize":1000."kafkaManagedOffsetMetadataCheckMillis":30000."kafkaManagedOffsetGroupCacheSize":1000000."kafkaManagedOffsetGroupExpireDays":7
},
"securityProtocol":"PLAINTEXT"
}
Copy the code
Znodes hosts the full amount of meta information in this path. Kafka-manager uses this meta information to access the managed Kafka cluster. / kafka – manager/clusters under the path/kafka – manager/clusters/cluster/switchable viewer / : topic, storage is the topic of information, but unlike expectations, Kafka-manager uses partial topic information. For existing topics, there are:
{
"69": [6.1.2]."0": [7.5.6]."5": [2.10.1]}Copy the code
Key corresponds to Parttion, and Value corresponds to Replicas.The preliminary conclusion is that part of the topics information here is used as cache, and the specific use needs further investigation.
1.3 Zookeeper Information Summary
Kafka-manager only stores basic meta-information about kafka clusters and does not store full details such as Partition/Topics.
2 CMAK source code walking
The source code for CMAK is written in Scala using the following frameworks and third-party libraries:
- Play Framework: Kafka-Mananger is essentially a Web application, so it is implemented using the MVC architecture of the Play Framework.
- AKKA: For building highly concurrent, distributed, and fault tolerant applications. All requests in Kafka Manager are processed asynchronously using Akka;
- Apache Curator Framework: used to access ZooKeeper;
- Kafka Sdk: used to obtain the last offset of each Topic, using Admin interface to achieve various management functions;
2.0 Intro
The overall business code is located in the APP path. For the framework used by CMAK, the entry of the program is CMAk /conf/routes. Here, due to time reasons, only the operation process of creating topic and finding topic is introduced.
2.1 GET topic
From the route map shows GET/clusters / : c/switchable viewer controllers. The Topic, switchable viewer (c: String) app/controllers/Topic. Scala
def topics(c: String) = Action.async { implicit request:RequestHeader =>
kafkaManager.getTopicListExtended(c).map { errorOrTopicList =>
Ok(views.html.topic.topicList(c,errorOrTopicList)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")}}Copy the code
Which business function is getTopicListExtended, returns an HTML templates views. HTML. Topic. TopicList, template address for app/views/topic/topicList scala. HTML, including business function is app/kafka/manager/KafkaManager.scala
def getTopicListExtended(clusterName: String) :Future[ApiError/ /TopicListExtended] = {
val futureTopicIdentities = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetTopicIdentities))(
identity[Map[String.TopicIdentity]])
val futureTopicList = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, KSGetTopics))(identity[TopicList])
val futureTopicToConsumerMap = tryWithKafkaManagerActor(KMClusterQueryRequest(clusterName, BVGetTopicConsumerMap))(
identity[Map[String.可迭代[(String.ConsumerType)]]])
val futureTopicsReasgn = getTopicsUnderReassignment(clusterName)
implicit val ec = apiExecutionContext
for {
errOrTi <- futureTopicIdentities
errOrTl <- futureTopicList
errOrTCm <- futureTopicToConsumerMap
errOrRap <- futureTopicsReasgn
} yield {
for {
ti <- errOrTi
tl <- errOrTl
tcm <- errOrTCm
rap <- errOrRap
} yield {
TopicListExtended(tl.list.map(t => (t, ti.get(t))).sortBy(_._1), tcm, tl.deleteSet, rap, tl.clusterContext)
}
}
}
Copy the code
KafkaManagerActor gets the cluster’s topic information. Packages of the actual operation is KMClusterQueryRequest here, the specific call for app/kafka/manager/actor/KafkaManagerActor scala
case KMClusterQueryRequest(clusterName, request) =>
clusterManagerMap.get(clusterName).fold[Unit] {
sender ! ActorErrorResponse(s"Unknown cluster : $clusterName")
} {
clusterManagerPath:ActorPath =>
context.actorSelection(clusterManagerPath).forward(request)
}
Copy the code
And app/kafka/manager/actor/KafkaStateActor scala
case KSGetTopics= >val deleteSet: Set[String] =
featureGateFold(KMDeleteTopicFeature) (Set.empty,
{
val deleteTopicsData: mutable.Buffer[ChildData] = deleteTopicsPathCache.getCurrentData.asScala
deleteTopicsData.map { cd =>
nodeFromPath(cd.getPath)
}.toSet
})
withTopicsTreeCache { cache =>
cache.getCurrentChildren(ZkUtils.BrokerTopicsPath)
}.fold {
sender ! TopicList(IndexedSeq.empty, deleteSet, config.clusterContext)
} { data: java.util.Map[String.ChildData] =>
sender ! TopicList(data.asScala.keySet.toIndexedSeq, deleteSet, config.clusterContext)
}
Copy the code
val BrokerTopicsPath = "/brokers/topics"
Copy the code
From the code, the topic seems to be fetched by cache+ from the known cluster meta-information and accessing the corresponding cluster, and from the target cluster’s /brokers/topics.
2.2 POST the CREATE topic
Check the URI table
GET /clusters/:c/createTopic controllers.Topic.createTopic(c:String)
POST /clusters/:c/topics/create controllers.Topic.handleCreateTopic(c:String)
Copy the code
Function in the path of app/controllers/Topic. Scala
def handleCreateTopic(clusterName: String) = Action.async { implicit request:Request[AnyContent] =>
featureGate(KMTopicManagerFeature) {
defaultCreateForm.bindFromRequest.fold(
formWithErrors => {
kafkaManager.getClusterContext(clusterName).map { clusterContext =>
BadRequest(views.html.topic.createTopic(clusterName, clusterContext.map(c => (formWithErrors, c))))
}.recover {
case t =>
implicit val clusterFeatures = ClusterFeatures.default
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName, "Topic"."Create", menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndCluster("Topics", clusterName, "Create Topic"), - / /ApiError(s"Unknown error : ${t.getMessage}")),
"Create Topic".FollowLink("Try again.", routes.Topic.createTopic(clusterName).toString()),
FollowLink("Try again.", routes.Topic.createTopic(clusterName).toString())
)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")
}
},
ct => {
val props = new Properties()
ct.configs.filter(_.value.isDefined).foreach(c => props.setProperty(c.name, c.value.get))
kafkaManager.createTopic(clusterName, ct.topic, ct.partitions, ct.replication, props).map { errorOrSuccess =>
implicit val clusterFeatures = errorOrSuccess.toOption.map(_.clusterFeatures).getOrElse(ClusterFeatures.default)
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName, "Topic"."Create", menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndCluster("Topics", clusterName, "Create Topic"),
errorOrSuccess,
"Create Topic".FollowLink("Go to topic view.", routes.Topic.topic(clusterName, ct.topic).toString()),
FollowLink("Try again.", routes.Topic.createTopic(clusterName).toString())
)).withHeaders("X-Frame-Options" -> "SAMEORIGIN")}})}}Copy the code
Implementation of the site for the app/kafka/manager/KafkaManager scala
def createTopic(
clusterName: String,
topic: String,
partitions: Int,
replication: Int,
config: Properties = new Properties) :Future[ApiError/ /ClusterContext] =
{
implicit val ec = apiExecutionContext
withKafkaManagerActor(KMClusterCommandRequest(clusterName, CMCreateTopic(topic, partitions, replication, config))) {
result: Future[CMCommandResult] =>
result.map(cmr => toDisjunction(cmr.result))
}
}
Copy the code
Specific for CMCreateTopic: app/kafka/manager/model/ActorModel scala according to this idea has been down to check:
case class CMCreateTopic(topic: String,
partitions: Int,
replicationFactor: Int,
config: Properties = new Properties) extends CommandRequest
Copy the code
app/kafka/manager/actor/cluster/ClusterMnagerActor.scala
case CMCreateTopic(topic, partitions, replication, config) =>
implicit val ec = longRunningExecutionContext
val eventualTopicDescription = withKafkaStateActor(KSGetTopicDescription(topic))(identity[Option[TopicDescription]])
val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
eventualTopicDescription.map { topicDescriptionOption =>
topicDescriptionOption.fold {
eventualBrokerList.flatMap {
bl => withKafkaCommandActor(KCCreateTopic(topic, bl.list.map(_.id).toSet, partitions, replication, config)) {
kcResponse: KCCommandResult= >CMCommandResult(kcResponse.result)
}
}
} { td =>
Future.successful(CMCommandResult(Failure(new IllegalArgumentException(s"Topic already exists : $topic"))))
}
} pipeTo sender()
Copy the code
case class KCCreateTopic(topic: String,
brokers: Set[Int],
partitions: Int,
replicationFactor:Int,
config: Properties) extends CommandRequest
Copy the code
case KCCreateTopic(topic, brokers, partitions, replicationFactor, config) =>
longRunning {
Future {
KCCommandResult(Try {
kafkaCommandActorConfig.adminUtils.createTopic(kafkaCommandActorConfig.curator, brokers, topic, partitions, replicationFactor, config)
})
}
}
Copy the code
def createTopic(curator: CuratorFramework,
brokers: Set[Int],
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties) :Unit = {
val replicaAssignment = assignReplicasToBrokers(brokers,partitions,replicationFactor)
createOrUpdateTopicPartitionAssignmentPathInZK(curator, topic, replicaAssignment, topicConfig)
}
Copy the code
The first step is to calculate the Replica and Broker groups. The second step is to register the new or modified content to the Zookeeper of the target Kafka cluster. The calculation of assignReplicasToBrokers is explained as follows
/** * There are 2 goals of replica assignment: * 1. Spread the replicas evenly among brokers. * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers. * * To achieve this goal, we: * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list. * 2. Assign the remaining replicas of each partition with an increasing shift. * * Here is an example of assigning * broker-0 broker-1 broker-2 broker-3 broker-4 * p0 p1 p2 p3 p4 (1st replica) * p5 p6 p7 p8 p9 (1st replica) * p4 p0 p1 p2 p3 (2nd replica) * p8 p9 p5 p6 p7 (2nd replica) * p3 p4 p0 p1 p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica) */
Copy the code
def createOrUpdateTopicPartitionAssignmentPathInZK(curator: CuratorFramework, topic: String, partitionReplicaAssignment: Map[Int, Seq[Int]], config: Properties = new Properties, update: Boolean = false, readVersion: Int = -1) { // validate arguments Topic.validate(topic) TopicConfigs.validate(version,config) checkCondition(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, TopicErrors.InconsistentPartitionReplicas) val topicPath = ZkUtils.getTopicPath(topic) if(! update ) { checkCondition(curator.checkExists().forPath(topicPath) == null,TopicErrors.TopicAlreadyExists(topic)) } partitionReplicaAssignment.foreach { case (part,reps) => checkCondition(reps.size == reps.toSet.size, TopicErrors.DuplicateReplicaAssignment(topic,part,reps)) } // write out the config on create, not update, if there is any, this isn't transactional with the partition assignments if(! update) { writeTopicConfig(curator, topic, config) } // create the partition assignment writeTopicPartitionAssignment(curator, topic, partitionReplicaAssignment, update, readVersion) }Copy the code
The operation created is done on the specified ZooKeeper cluster. If it is created for the first time, the initial registration should be done under /config/ Topics /:topic.
3 Conclusion
The CMAK service uses this meta information to link Kafka clusters to its Zookeeper server. The CMAK service uses this meta information to link Kafka clusters to its Zookeeper server. To complete the operation. So if it is hope that took cmak cluster display and cluster topic to add and delete work, only need to sweep over/kafka – mananger/configs / : cluster these znodes yuan inside information
4 TODO
Future jobs
- The call link is preliminarily clarified, but the details of the specific implementation are not clear.
- Since the use of topics under topics path in ZK of CMAk is observed, what are these data for?
- Cmak is slow to query the kafka cluster for the first time after a restart, but fast to query it again. Cmak has its own buffering mechanisms.