Controller

Controller is the core component of Apache Kafka. Its primary role is to manage and coordinate the entire Kafka cluster with the help of Apache ZooKeeper, such as the election of a partition leader replica.

Any Broker in the cluster can act as a controller, but only one Broker can act as a controller during operation, performing its administrative and coordinating functions. In other words, every functioning Kafka cluster has one and only one controller at any one time.

Dependence on ZK

  • The Kafka controller makes extensive use of the Watch function to coordinate and manage clusters.
  • Persisting part of data to realize data storage function

The figure below shows the role of ZK in a Kafka cluster

Data is stored

It stores the following data. Although the real Controller provides external data services, the Controller initializes data from ZK, which means that the clustered Broker does not interact directly with ZooKeeper to obtain metadata. Instead, they always communicate with the Controller to get and update the latest cluster data.

failover

  • There is a risk of a Single Point of Failure in a Kafka cluster where only one Broker can act as a controller
  • Failover refers to the fact that Kafka can quickly sense when a running controller suddenly goes down or terminates unexpectedly and immediately starts a backup controller to replace the previously failed controller. This process is called a Failover, and it is done automatically without your manual intervention

  • Broker 0 is the controller at the beginning. When Broker 0 goes down, ZooKeeper senses it through the Watch mechanism and removes the /controller temporary node. After that, all surviving brokers campaign for the new controller status. Broker 3 finally won the election and successfully rebuilt the/Controller node on ZooKeeper. Broker 3 then reads the cluster metadata information from ZooKeeper and initializes it into its own cache. At this point, the Failover of the controller is complete and you can perform normal work duties

Self-Managed Metadata Quorum

Apache Kafka uses Apache ZooKeeper to store metadata, such as partition location and theme configuration, in a ZooKeeper cluster. In 2019 the community came up with a plan to break this dependency and bring metadata management to Kafka itself. In fact, this is similar to the early offset information stored on the ZK. Later, the community adopted the internal theme to store the offset information, mainly because the ZK is not suitable for frequent updates and storing large amounts of data.

The main problem with the current design is that we use Kakfa to install, deploy, and operate two clusters of components. In addition to Kafka itself, there is a ZK cluster, and external storage metadata limits Kafka’s scalability. When a Kafka cluster starts, or when a new controller is selected, the controller must load the full state of the cluster from ZooKeeper. As the amount of metadata increases, the loading process takes longer, limiting the number of partitions Kafka can store

Kip-500 Outlines a better way to handle metadata in Kafka. We can call this “Kafka on Kafka” because it involves storing Kafka’s metadata in Kafka itself, rather than in an external system like ZooKeeper.

On systems that implement KIP-500, metadata will be stored in partitions within Kafka, not in ZooKeeper. The controller becomes the leader of the partition; Apart from Kafka itself, there is no need to configure and manage an external metadata system.

We treat metadata as a log. Brokers can only read the end of the log if they need the latest data. This is similar to the way a consumer who needs the latest log entry only needs to read the end of the log instead of the entire log. Brokers can also persist their metadata cache when processes are restarted.

Controller architecture

The Kafka cluster selects a controller node to manage partition Leaders and cluster metadata. The more partitions and metadata we have, the more important the scalability of the controller becomes. We want to minimize the number of operations that take time in linear proportion to the number of topics or partitions.

One of these operations is controller failover. Currently, when Kafka selects a new controller, it needs to load the state of the entire cluster before proceeding. As the amount of cluster metadata grows, this process takes longer and longer.

In contrast, a kiP-500 system will have several backup controllers ready to take over if the active controller fails. These standby controllers are just other nodes in Raft Quorum of the metadata partition. This design ensures that we never have to go through a lengthy loading process when choosing a new controller.

Kip-500 will speed up the creation and removal of topics. Currently, when a topic is created or deleted, the controller must reload the complete list of all topic names in the cluster from ZooKeeper. This is necessary because when ZooKeeper notifies us that the topic set in the cluster has changed, it doesn’t tell us exactly which topics have been added or removed. In contrast, after the kiP-500 is fully implemented, creating or deleting a topic simply requires the creation of a new entry in the metadata partition, which is an O(1) operation.

Metadata scalability is a key part of the future expansion of Kafka. We expect that a single Kafka cluster will eventually be able to support a million partitions or more and also simplify Kafka deployment and configuration

The Controller’s election

  • When the Broker starts, it tries to create/Controller nodes in ZooKeeper. Kafka currently elects controllers as follows: The first Broker that successfully creates a /controller node is designated as the controller
  • Each broker starts with an attempt to read a brokerID value for the/Controller node. If a brokerID value is not -1, it indicates that another broker node has successfully campaigned for controller, so the current broker will abandon the campaign
  • If the /controller node does not exist in Zookeeper, or if the data in the node is abnormal, an attempt will be made to create the /controller node. Other brokers may attempt to create the node at the same time as the current one. Only the broker that was successfully created becomes the controller, while the broker that failed to be created is an election failure.
  • Each broker stores in memory the BrokerID value for the current controller, which can be identified as activeControllerId.

controller_epoch

  • Zookeeper also has a controller-specific/Controller_EPOCH node, which is PERSISTENT and holds an integer value for controller_EPOCH.
  • Controller_epoch Is used to record the number of times a controller has changed. That is, the generation of the current controller is known as the epoch of the controller.
  • Controller_epoch starts with a value of 1, that is, the epoch of the first controller in the cluster, which is incrementing by 1 each time a new controller is selected when the controller changes. Each request that interacts with the controller will carry the Controller_EPOCH field. If the controller_EPOCH value in the request is less than the controller_EPOCH value in memory, the request is considered to be a request to an expired controller. The request will be deemed invalid.
  • If the requested Controller_EPOCH value is greater than the controller_EPOCH value in memory, a new controller has been elected. Kafka uses controller_epoch to ensure that controllers are unique and therefore consistent.

The main functions of the controller

Topic management (Create, delete, add partitions)

  • By theme management, we mean that the controller helps us create, delete, and add partitions to Kafka themes. In other words, when we execute the kafka-Topics script, most of the background work is done by the controller.

Partition redistribution

  • Partition reassignment refers to the fine-grained allocation of existing subject partitions provided by the Kafka-reassign-Partitions script. This part of the function is also implemented by the controller.

Preferred Leader Election

  • The Preferred Leader election is a solution provided by Kafka to avoid overload of some brokers

Cluster member management (new Broker, active Broker shutdown, Broker down)

  • This includes automatic detection of new brokers, active Broker shutdown, and passive Broker downtime. This automatic detection relies on a combination of the aforementioned Watch functionality and the ZooKeeper temporary node.
  • For example, the controller component checks for changes in the number of child nodes under ZooKeeper’s /brokers/ IDS node using the Watch mechanism. Currently, when a new Broker is started, it creates its own ZNode node under /brokers. Once created, ZooKeeper pushes notifications to the controller through the Watch mechanism, which automatically senses the change and starts subsequent new Broker jobs.
  • Detecting Broker activity relies on another mechanism just mentioned: temporary nodes. After each Broker is started, a temporary ZNode is created under /brokers/ IDS. When the Broker goes down or actively shuts down, the session between the Broker and ZooKeeper ends and the ZNode is automatically deleted. Similarly, ZooKeeper’s Watch mechanism pushes this change to the controller so that the controller knows that a Broker has shut down or gone down and can “clean up”.

Data services

  • To provide data services to other brokers. The most complete cluster metadata information is stored on the controller, and all brokers periodically receive metadata update requests from the controller to update their cached data in memory. Data below the main package

  1. All subject information. Include specific partition information, such as who the leader replicas are, what replicas are in the ISR collection, and so on.

  2. All Broker information. Include which brokers are currently running and which are closing.

  3. All partitions involved in O&M tasks. Includes a list of partitions where the Preferred leader election and partition reallocation are currently taking place

  4. ZooKeeper also stores a copy of the metadata. When the controller is initialized, it reads the corresponding metadata from ZooKeeper and populates it into its cache. With this data, the controller can provide data services externally. This refers primarily to other brokers to which the controller synchronizes the data by sending requests