1. What is a controller

Controller is the core component of Apache Kafka. Its main role is to manage and coordinate the entire Kafka cluster with the help of Apache ZooKeeper. Any Broker in the cluster can act as a controller, but only one Broker can act as a controller during operation, performing its administrative and coordinating functions. In other words, every functioning Kafka cluster has one and only one controller at any one time that manages and coordinates the entire Kafka cluster.

2. Functions of Zookeeper in Kafka

ZooKeeper is an integral part of Kafka, so it’s important to understand how ZooKeeper works in Kafka.

ZooKeeper is a highly reliable distributed coordination service framework. It uses a data model similar to the tree structure of a file system, and the root directory also starts with a “/”. Each node on this structure is called a ZNode and is used to hold some metadata coordination information. In terms of znode persistence, Znodes can be divided into persistent Znodes and temporary Znodes. Persistent ZNodes do not disappear when the ZooKeeper cluster restarts, while temporary ZNodes are bound to the ZooKeeper session that created the ZNode. Once the session ends, the node is automatically deleted.

In Kafka, Zookeeper has the following functions:

  1. Zookeeper records the live status of all brokers. Brokers send heartbeat requests to ZooKeeper to report their status. Zookeeper maintains a list of running brokers that belong to a cluster.
  2. A Kafka cluster has multiple brokers, one of which is elected as the controller. Zookeeper is responsible for selecting the controller from the brokers.
  3. Zookeeper records ISR information and updates it in real time. If a member is found abnormal, it is removed immediately.
  4. Zookeeper keeps the registration information of all nodes and topics. Nodes and topics exist in the form of temporary nodes in ZooKeeper. As soon as the session with ZooKeeper is closed, their information is lost.
  5. Zookeeper keeps topic-specific configurations, such as the list of topics, the number of partitions per topic, the location of replicas, and so on.
  6. Notify Kafka when a cluster changes, including creating a Topic, bringing Broker online/offline, and deleting a Topic

3. Functions of the controller

  1. Topic management: The controller helps with Topic creation, deletion, and partitioning. That is, most of the background work is done by the Kafka controller when the kafka-topics.

  2. Partition redistribution: / admin/registered PartitionReassignmentListener reassign_partitions node in Zookeeper, redistribution of action to deal with partitions. As/registered isr_change_notification node in the Zookeeper IsrChangeNotificetionListener, used to deal with the action of ISR collection changes. As/admin/in the Zookeeper preferred – up – add PreferredReplicaElectionListener election node, to deal with a copy of the priority action.

  3. Preferred Leader election: Kafka provides a way to change leaders to avoid overloading some brokers.

  4. Cluster member Management: Add BrokerChangeListener to the/Brokers/IDS/node in Zookeeper to handle broker changes. /brokers/ids/ brokers/ids/ brokers/ids/ brokers/ids/ brokers/ids/ When the Broker is down, temporary nodes are deleted, and the controller’s listener senses that the Broker is offline.

  5. Data service: The controller holds the most complete cluster metadata, and other brokers regularly receive updates from the controller to update the metadata.

  6. Start and manage partition state machines and replica state machines.

4. Controller election

Each Broker acts as a ZooKeeper client and attempts to create /controller temporary nodes to the ZooKeeper server, but only one Broker can successfully create temporary nodes. Because the /controller node is a temporary node, the temporary node is deleted when the primary controller fails or the session fails. At this point all brokers will re-elect the Leader, attempting to create /controller temporary nodes.

The Kafka controller stores Broker node information on the ZooKeeper /controller node. In addition, cluster-related metadata information is also stored in ZooKeeper. The controller reads the cluster metadata information in ZooKeeper to create a ControllerContext.

5. Controller Failover

As we noted earlier, only one Broker can act as a controller in a Kafka cluster. This risks a Single Point of Failure. How does Kafka cope with a Single Point of Failure? The answer is to provide Failover for the controller, which is called Failover.

Failover refers to the fact that Kafka can quickly sense when a running controller suddenly goes down or terminates unexpectedly and immediately starts a backup controller to replace the previously failed controller. This process is called a Failover, and it is done automatically without your manual intervention.

Initially, Broker 0 is the controller. When Broker 0 goes down, ZooKeeper senses it through the Watch mechanism and removes the /controller temporary node. After that, all surviving brokers campaign for the new controller status. Broker 3 finally won the election and successfully rebuilt the/Controller node on ZooKeeper. Broker 3 then reads the cluster metadata information from ZooKeeper and initializes it into its own cache. At this point, the Failover of the controller is complete and you can perform normal work duties.

Zookeeper also has a controller-specific/Controller_EPOCH node, which is Persistent and holds an integer value for controller_EPOCH. The controller_EPOCH value is used to record the number of times a controller has changed, that is, the generation of the current controller, also known as the “controller epoch”.

Controller_epoch has an initial value of 1, that is, the epoch of the first controller in the cluster, which is incrementing by 1 each time a new controller is selected when the controller changes. Each request that interacts with the controller carries the controller_EPOCH field. Kafka uses controller_epoch to ensure that controllers are unique and therefore consistent.

6. Internal design principle of controller

After the election is successful, the controller reads the data of each node in Zookeeper to initialize the context information, and also needs to manage the context information. For example, when adding several partitions for a topic, the controller also needs to update the context information while creating these partitions. These changes also need to be synchronized to other normal Broker nodes.

These changes may be triggered by listeners or timed tasks, but they are eventually converted into events that are pushed to the LinkedBlockingQueue in the order in which they occurred. The ControllerEventThread then handles each event according to FIFO, avoiding the need for locking to maintain thread safety between multiple threads.

Prior to Kafka 0.11, the controller design was cumbersome and the code was a bit confusing, leading to many controller bugs in the community that could not be fixed.

The controller is multithreaded, creating many threads internally. For example, the controller needs to create a Socket connection for each Broker and then create a dedicated thread to send specific requests to those brokers. If there are many brokers in the cluster, more threads need to be created on the controller side.

In addition, the session that the controller connects to ZooKeeper also creates a separate thread to handle notification callbacks from the Watch mechanism. In addition to these threads, the controller creates additional I/O threads for topic deletion.

Worse than the multithreaded design, these threads also access the shared controller cache data. As we all know, multi-threaded access to shared mutable data is the biggest difficulty in maintaining thread safety. To protect data security, controllers have to make extensive use of ReentrantLock synchronization in their code, which further slows down the processing of the entire controller.

For these reasons, the community reworked the underlying controller design in 0.11. The biggest improvement was to change the multi-threaded scheme to a single-thread + queue approach to handle these common resources (Topic operations, etc.).

The main benefit of this solution is that the state stored in the controller cache is handled by only one thread, so there is no need for a heavyweight thread synchronization mechanism to maintain thread safety. Kafka no longer has to worry about multi-threaded concurrent access, which is very useful for the community to locate and diagnose controller problems.