preface

For businesses with small business volume, multiple businesses often share Kafka cluster. As the business scale grows, topics need to be continuously added or partition number needs to be expanded on the basis of the original topic. In addition, some large-scale businesses may not deploy independent clusters in the trial phase. When services break out, you need to quickly expand cluster nodes. Orders without sacrificing stability is the premise of the cluster size is limited, often encounter after business size bigger cannot directly on the original cluster expansion, will only make the business to create a new cluster to support new business, then the user facing the cost of system changes, sometimes because of business association, the cluster involves a separated business deployment plans change, it is hard to short time to solve it.

In order to rapidly support business expansion, we need to expand the cluster without any changes made by the business side. A large-scale cluster usually means more partitions and more broker nodes. The following will describe the main challenges faced by the expansion of the cluster.

challenge

1. Number of ZK nodes

Kafka’s topics are stored and replicated as partitions on the broker, so the cluster needs to maintain the Leader information for each partition. Which broker nodes are multiple copies of a single partition on, and which copies are in replication synchronization state. To store this metadata, the Kafka cluster creates a node on the ZK cluster for each partition. The number of partitions directly determines the number of nodes on the ZK.

Assuming that there are 10,000 topics in the cluster, and each topic contains 100 partitions, the number of nodes on ZK is about 2 million, and the snapshot size is about 300MB. Data changes of ZK nodes will be written into transaction logs for persistent storage. When transaction logs reach a certain number of entries, data will be fully written into the persistent snapshot file. As the number of partition nodes increases, the snapshot file becomes large. The write of the full write snapshot and transaction log will affect each other, affecting the response speed of the client.

2. The Partition replication

Replication of Kafka partitions is performed by a separate replication thread. Multiple partitions share replication threads. As partitions on a single broker grow, the number of partitions that a single replication thread is responsible for increases. Each partition corresponds to a log file. When data is written to a large number of partitions at the same time, data writes on the disk are scattered and the write performance deteriorates. As a result, data replication may fail to keep up, leading to frequent ISR fluctuations. Adjusting the number of replication threads reduces the number of partitions that a single thread is responsible for, but it also increases disk contention.

3. Controller Switchover duration

Due to network or machine faults, there may be controller switchover in the running cluster. When the controller is switched, the information of broker nodes, the partition replication relationship of topic, and the node where the partition leader is currently located need to be recovered from ZK. The partition’s complete information is then synchronized to each broker node.

In virtual machine tests, the metadata of 1 million partitions took about 37s to recover from ZK to broker, and the metadata generated by 1 million partitions was about 80MB after serialization (data size is related to the number of copies, topic name length, etc.). After other brokers receive the metadata, it is deserialized and updated to the local broker memory, with a response time of about 40s (depending on the network environment).

Controller controls leader switchover and sends metadata to other broker nodes in the cluster. The longer recovery time of Controller increases the risk of cluster unavailability. If the Leader of a partition needs to be switched during controller switchover, the client may not be able to obtain the new Leader for a long time, resulting in service interruption.

4. Duration of offline recovery of the broker

During routine maintenance, the broker may need to be restarted. In order not to affect users, the broker will notify the controller to switch to the Leader before stopping, and will also switch to the Leader when the broker fails. The leader switch information needs to update the partition status node data on the ZK and synchronize the metadata information to other brokers. As the number of partitions increases, it means that the switching time of the Partitiion Leader on a single broker node becomes longer.

Based on the above factors, we know that the increase in the number of partitions will directly affect the fault recovery time of the controller. Increasing the number of partitions on a single broker affects disk performance and replication stability. The broker restarts and the Leader switchover takes longer. Of course, it is perfectly possible to limit the number of partitions per broker under the existing architecture to avoid the influence of the number of partitions on a single broker, but this means that the number of broker nodes in the cluster will increase. The number of broker nodes that the controller is responsible for increases, while the number of partitions that the controller needs to manage does not decrease. If we want to solve a scenario where a large number of partitions share a cluster, The core then needs to solve the problem of either improving the processing capability of a single controller or increasing the number of controllers.

The solution

1. Single ZK cluster

The following optimizations can be made to improve the processing performance of individual controllers:

Zk nodes are pulled in parallel

When the Controller pulls the metadata from ZK, it adopts the asynchronous waiting for the data response, and the request and reply are not serial waiting, but the single-thread processing consumes about 37s. We can pull the metadata in parallel through multi-threads, and each thread is responsible for part of the partition, thus reducing the time of pulling the metadata.

It takes about 28s for a single thread to obtain 1 million node data in a simple simulation on a VIRTUAL machine. It is distributed to five threads for parallel processing, and each thread is responsible for pulling data of 200,000 partitions. The total time is shortened to about 14s (this time is affected by the performance of the virtual machine itself. On the same VM, it takes about 6s for a single thread to pull 200,000 partitions.) Therefore, the parallel pull of partitions can significantly shorten the recovery time during controller recovery.

Change the synchronization metadata method

As mentioned above, the metadata generated by 1 million partitions is about 80MB. If we limit the number of partitions on a single broker, it means that we need to increase the number of nodes of the broker. Controller switches parallel synchronization to a large number of brokers. The controller node is affected by traffic, and it takes a long time to synchronize 80MB metadata. Therefore, it is necessary to change the way of synchronizing metadata in the cluster. For example, the metadata is stored in the built-in topic just like the consumption location. The controller sends the data written to the ZK to the built-in topic in the form of messages. The broker consumes these data from topics and updates the metadata in memory. Such a scheme would fully synchronize the metadata during controller switches, but would require a significant adjustment to the current Kafka architecture. (There are more options, of course. Such as not using ZK to manage metadata, but that is beyond the scope of this article).

Is there another way to support large-scale partitions without changing the Kafka architecture too much? We know that when Kafka client interacts with broker, it will first pull topic metadata through the specified address, and then produce and consume according to the Leader corresponding to the metadata connection partition. By controlling metadata, we can control the machine that the client produces and consumes the connection. The client does not require these machines to be in the same cluster, but only needs the client to get the status information of these partitions. Therefore, we can distribute different topics to different clusters, and then try to combine the topic information of different clusters to return to the client. The client can connect to different clusters at the same time, which is a large cluster from the client’s perspective. In this way, there is no need for a single physical cluster to support a very large scale, and multiple physical clusters can be combined to support a larger scale. In this way, users do not need to stop and modify services during capacity expansion. Here is how to achieve this solution.

2. Create a logical cluster in a small **** cluster

When we need to build a logical cluster, there are several core issues that need to be addressed:

1. When the client needs to pull metadata, how to assemble metadata from multiple small physical clusters and return it to the client;

2. How to timely notify metadata changes in different clusters;

3. How to distribute the topics that store the consumption location and transaction status in multiple clusters?

These questions are explained one by one below:

The metadata service

To solve the metadata assembly problem, we can choose one of the physical clusters in the logical cluster as the main cluster, and the other clusters as the extended cluster. The main cluster is responsible for providing metadata, consumption location, and transaction related services. Of course, the main cluster can also provide message production and consumption services. An extended cluster can only be used for the production and consumption of business messages. We know that when the partition Leader switches, the new metadata data needs to be synchronized to the broker in the cluster via the controller in the cluster. When a logical cluster is composed of multiple independent physical clusters, the Controller is not aware of Broker nodes in other clusters.

We can simply modify the metada interface in the main cluster. When the client pulls metadata, we can jump to another cluster to pull metadata, and then fuse the metadata on the main cluster and return it to the client.

Although jumping and pulling metadata has some performance costs, it is not normally in the path of message production and consumption and has little impact on the client. By reassembling metadata while the client pulls it, the problem of updating metadata across physical clusters can be avoided and the real-time performance can be ensured.

Consumption grouping and transaction coordination

When the members of consumption groups need to coordinate the partition of pulling data, the server will return the corresponding coordination node according to the partition information of saving consumption location topic. Therefore, we need to determine the cluster of consumption location topic distribution in a logical cluster. Avoid problems such as different coordinators returned by nodes accessing different physical clusters and different consumption locations pulled from different clusters. We can select broker nodes in the main cluster to provide consumption and transaction coordination services, and the consumption location is kept only on the main cluster.

With some of the modifications above, we can support a much larger scale of business where users only need to know the address of the main cluster.

In addition to the core issues mentioned above, we also need to pay attention to the distribution of topics. Since THE CKafKA of Cloud itself will forward the request for creating topics on the broker to the control module for creation, the distribution of topics in multiple physical clusters can be easily solved. In addition, topics with the same name may occur in different physical clusters in the same logical cluster.

The physical cluster is split

We have described how to form multiple physical clusters into a single logical cluster. Sometimes, we may face a problem, that is, a single physical cluster needs to continuously expand partitions on existing topics for some reasons. If multiple topics need to be expanded at the same time, a single physical cluster may become too large. Therefore, you need to split one physical cluster into two physical clusters.

Splitting the cluster involves splitting the ZK cluster and grouping broker nodes into two groups. Each group is connected to a different ZK node. For example, we can add observer nodes to the original ZK cluster. The new broker is a set of brokers from the original cluster, and we have the new broker fill in only the observer address. Before the ZK cluster split, the built-in KAFKA migration tool can easily migrate different topics to their respective broker groups. The partitions of the same topic are only distributed among the broker nodes in the same group. The Observer nodes are subsequently removed from the existing ZK cluster, and the Kafka cluster is split by having the Observer form a new ZK cluster with other ZK nodes.

conclusion

The size of partitions hosted by a single cluster can be increased both by improving controller performance and by assembling multiple physical clusters into a logical cluster. However, the existing kafka architecture is less modified by building multiple physical clusters, with better recovery time and service stability.

Of course, when using the Kafka service, it is also a good practice to split services to connect different clusters if the service allows a moderate number of partitions in a cluster.