At the heart of Apache Kafka® is logging. A log is a simple data structure that works closely with the underlying hardware through sequential reads and writes. Log-centric designs take advantage of efficient disk buffering, CPU caching, preread, zero copy, and many other features, resulting in well-known high efficiency and throughput. For those new to Kafka, these topics and its low-level implementation of commit logging are often the first thing they learn about Apache Kafka.
But the log code itself is a relatively small part of the overall system. A large part of Kafka’s code base is responsible for allocating partitions (that is, logs) between brokers in a cluster, assigning leadership, handling failures, and so on. This code makes Kafka a reliable and trusted distributed system.
Historically, Apache ZooKeeper has been a key part of distributed code work. ZooKeeper provides a reliable store of metadata that stores the most important information in the system: where the partition is, which replica is the Leader, and so on. Using ZooKeeper early in the project made sense because it is a powerful and proven tool. But ultimately, ZooKeeper is a special file system/trigger API based on consistent logging. Kafka is a publish/subscribe API built on consistent logging. This allows operating system personnel to tune, configure, monitor, secure, and evaluate communication and performance across two logging implementations, two network layers, and two security implementations, each with different tools and monitoring hooks. It becomes unnecessarily complicated. This inherent and inevitable complexity has prompted a recent initiative to replace ZooKeeper with a mediation service that runs entirely inside Kafka.
Of course, replacing ZooKeeper is quite a big job, and last April we launched a community initiative to speed things up and deliver a working system by the end of the year.
I just sat down with Jason, Colin and the KiP-500 team and went through the full life cycle of Kafka servers, production, consumption and all zooKeeper for free. Very sweet!
Ben Stopford@benstopford
So we are pleased to say that early access to the KIP-500 code has been committed to Trunk and is expected to be included in the upcoming 2.8 release. For the first time, you can run Kafka without ZooKeeper. We call this the Kafka Raft metadata pattern, often abbreviated to KRaft(Craft) pattern.
Note that some features are not available in this earlier release. We do not yet support the use of ACLs and other security features or transactions. Also, partition redistribution and JBOD(expected in the Apache Kafka release later this year) are not supported under KRaft mode. Therefore, considering that the Quorum controller is an experimental feature, we do not recommend placing it under production workloads. However, if you do try the software, you’ll find many new advantages: it’s easier to deploy and operate, you can run Kafka as a separate process, and it can hold more partitions in each cluster (see data information below).
Quorum Controller: event-driven consensus
If you choose to run Kafka with the new Quorum controller, all metadata functions previously undertaken by the Kafka controller and ZooKeeper will be merged into the new service and run in the Kafka cluster. Quorum controllers can also run on dedicated hardware if required.
But inside, it gets interesting. Quorum controllers use the new KRaft protocol to ensure that metadata is replicated precisely in the Quorum. This protocol is similar to ZooKeeper’s ZAB protocol and Raft in many respects, but there are some important differences, one of which is notable and appropriate because it uses an event-driven architecture.
The Quorm controller uses the event source storage model to store its state, which ensures that the internal state machine can always be recreated accurately. The event logs used to store this state (also known as metadata topics) are compressed periodically with snapshots to ensure that the log does not grow indefinitely. Other controllers in Quorm track the active controller by creating events in response to the active controller and storing them in their logs. Therefore, if a node is paused due to a partition event, it can quickly catch up on any events it missed by accessing the log when it logs back in. This greatly reduces unavailable Windows and improves the system’s worst-case recovery time.
Event-driven internal consensus
The event-driven nature of the KRaft protocol means that, unlike ZooKeeper-based controllers, a quorum controller does not need to load state from ZooKeeper before becoming active. When the leadership changes, the new active controller already has all committed metadata records in memory. In addition, the event-driven mechanism used in the KRaft protocol is also used to track metadata across clusters. Tasks previously handled using RPC now benefit from being event-driven and communicating using real logs. One pleasant consequence of these changes is that Kafka now supports more partitions than ever before. Let’s discuss it in more detail.
Extended Kafka: supports millions of partitions
The number of partitions that a Kafka cluster can support is determined by two attributes: the partition limit per node and the cluster scope partition limit. Both are interesting, but so far metadata management has been a major bottleneck for cluster scope limitations. Previous Kafka Improvement Recommendations (KIPs) have improved the per-node limits, although there is always more to do. But Kafka’s scalability depends on adding nodes to get more capacity. This is where the cluster scope limit becomes important, as it defines the upper limit of scalability within the system.
The new Quorum controller is designed to handle more partitions in each cluster. To assess this, we ran tests similar to those we ran in 2018 to expose Kafka’s inherent partitioning limits. These tests measure the time it takes to shut down and restore, which refers to O(#partitions) operations of the old controller. It is this operation that sets an upper limit on the number of partitions Kafka can support in a single cluster.
The previous implementation, as Jun Rao explained in the article above, could reach 200K partitions, limited by the time it took to move critical metadata between external consensus (ZooKeeper) and internal leader management (Kafka Controller). With the new quorum controller, the two roles are served by the same component. The event-driven approach means that controller failover is now almost instantaneous. Here is a cluster running 2 million partitions (10 times the previous limit) in our lab:
With ZooKeeper-Based Controller
With Quorum Controller
Controlled Shutdown Time (2 million partitions)
135 sec.
32 sec.
Recovery from Uncontrolled Shutdown (2 million partitions)
503 sec.
37 sec.
Both controlled and uncontrolled downtime are important. Controlled shutdown affects common operating scenarios, such as rolling restart: the standard procedure for deploying software changes while keeping the entire process available. Recovery from an uncontrolled shutdown may be more important because it sets a recovery time target (RTO) for the system, such as after an unexpected failure, such as a VM or POD crash or data center unavailability. While these measures are just broader system performance indicators, they directly measure the well-known bottlenecks associated with ZooKeeper usage.
Note that controlled and uncontrolled measurements are not directly comparable. The uncontrolled shutdown includes the time it takes to elect a new leader, while the control shutdown does not. This difference was intentional to maintain control of the case with Jun Rao’s original measurements.
Cluster size drops: a single process runs Kafka
Kafka is often thought of as a heavyweight infrastructure, and the complexity of managing ZooKeeper (because it is a single distributed system) is a big reason for this perception. This typically results in a project starting with a more lightweight message queue, such as a traditional queue like ActiveMQ or RabbitMQ, and then moving to Kafka when scale requires it.
This is unfortunate because Kafka provides an abstraction that forms a commit log that is suitable for small-scale workloads, as you might see in a startup, because it is high throughput on Netflix or Instagram. More importantly, if you want to add stream processing, you need Kafka and its commit log abstraction, whether it uses Kafka Streams, ksqlDB, or some other stream processing framework. But because of the complexity of managing two separate systems kafka and ZooKeeper, users often feel they have to choose between scale and ease of getting started.
That is no longer the case. The KIP-500 and KRaft patterns provide a great, lightweight way to get started with Kafka or use it as an alternative to monolithic proxies such as ActiveMQ or RabbitMQ. Lightweight single-process deployments are also better suited to edge scenarios and those that use lightweight hardware. Clouds add an interesting Angle to this question. Hosted services like Converged Cloud completely eliminate the operational burden. So whether you want to run your own cluster or let it run for you, you can start small and scale (possibly) to large scale as the underlying use cases scale — all using the same infrastructure. Let’s take a look at what a single-process deployment looks like.
Take Kafka for a spin without ZooKeeper
The new Quorm controller is available today as an experimental feature in Trunk and is expected to be included in the upcoming Apache Kafka 2.8 release. So what can you do with it? As mentioned above, a simple but very cool new feature is the ability to create single-process Kafka clusters, as shown in the short demo below.
Presentation address: asciinema.org/a/403794/em… ?
Of course, if you want to extend it to support higher throughput and add replication for fault tolerance, you just need to add new agent processes. As you know, this is an early access version of Kraft’s Quorm controller. Do not use it in high-load environments. In the coming months, we will add the final missing pieces, implement TLA+ modeling of the protocol, and refine the Quorm controller in the converged cloud.
You can now try out the new Quorm controller for yourself. Check out the full description on GitHub. Try it now
The team behind it
This would have been (and will continue to be) a tremendous effort without the Apache Kafka community and a team of distributed systems engineers working tirelessly during the pandemic to take it from zero to a working system in about nine months. We would like to extend our special thanks to Colin McCabe, Jason Gustafson, Ron Dagostino, Boyang Chen, David Arthur, Jose Garcia Sancio, Guozhang Wang, Alok Nikhil, Deng Zi Ming, Sagar Rao, Feyman, Chia-Ping Tsai, Jun Rao, Heidi Howard, and all members of the Apache Kafka community help achieve this goal.
**
www.confluent.io/blog/kafka-…