Why does Zookeeper appear? What is Zookeeper?

  1. Background: Traditional centralized has no way to meet the rapid development of large Internet system performance requirements, so the distributed approach by the industry more and more sure, came to deploy on multiple computers how to coordinate related services, because only through messaging between distributed systems for communication and coordination, The addition of network brings many unstable factors to the system, such as abnormal communication, three states of network, network partition, death of service nodes, etc. At this time, it is in urgent need of a framework or middleware to solve the coordination problem between distributed nodes.
  2. Before Zookeeper came along2PC, 3PC, Paxos: Because distributed nodes need to introduce a coordinator (Coordinator) to unify all distributed nodes (actor) execution logic, such as the scene is one of the most typical distributed across the bank transfer, we need the services of a call transfer bank buckles money from their bank account, also need to call the transfer to the bank saving service, and the order of these two services has obvious of And rollback should be supported for security (distributed transactions), whereas services do not know each other’s status, so needAbstract out a coordinator to schedule services and ensure data consistency in distributed systems(Due to space limitations, there is no way to go into detail about these historical algorithms, if you are interestedFrom Paxos to Zookeeper
  3. Zookeeper uses the Zookeeper Atomic Broadcast (ZAB) consistency protocol instead of the Paxos algorithm, which will be described later.
  4. What is Zookeeper? Zookeeper, an open source implementation of Google Chubby, is a typical distributed data consistency solution.

2. How does Zookeeper coordinate distributed nodes?

  1. Since communication between distributed nodes is essentially cross-process communication between server nodes, I thought it would be important to first understand the way interprocess communication is defined in the operating system: (1) communication through pipes (there are many similar, such as queue, shared memory, semaphore, in fact, are essentially through the memory of each process can share resources to achieve the communication between two processes on the same computer); (2) Communicate through Socket (RPC, which can realize communication between two processes across the computer);
  2. Zookerper uses a shared, tree-structured namespace to coordinate multiple processes in distributed programs.

2.1 What does this namespace look like?

  1. What is this namespace? A data model in the memory of a Zookeeper server consisting of a series of data nodes (also known as ZNodes)

Each ZNode stores onestatObject, childern reference, data field;Copy the code
  1. The data field holds the data values stored by the ZNode;
  2. Childern reference, that is, child node;
  3. Stat object: used to store the current state of ZNode;
[zk: localhost:2181(CONNECTED) 0] get /app1/p_1 127.0.0.1 The following parameters arestatCtime = Thu Apr 18 15:17:11 CST 2019 //Created ZNode transaction ID ctime = Thu Apr 18 15:17:11 CST 2019 //Created MZxid = 0xdd59 //Modified ZXID, Mtime = Thu Apr 18 15:17:11 CST 2019 //Modified Time, Indicates the time when the node was last updated pZxid = 0xDD62 // Indicates the transaction ID of the node when the child node list was last modified. Note that the pZxid is changed only when the list of child nodes is changed. Changes in the content of child nodes do not affect the pZxid. Cversion = 4 // Version number of the child node datLVersion = 0 // Data node version number aclVersion = 0 //ACL version number ephemeralOwner = 0x0 // sessionID of the session that created the node. If the node is persistent, this property has a value of 0. DataLength = 9 // Data field length numChildren = 4 // Number of child nodesCopy the code

You may not understand these parameters at the moment, but consider that each ZNode has a stat object that stores all the state information for that data node.

  1. ZNode can be divided into persistent node, temporary node and sequential node. After combined use, there are four combined node types: PERSISTENT nodes, PERSISTENT sequential nodes, EPHEMERAL nodes, EPHEMERAL_SEQUENTIAL nodes

After being created, the persistent node is kept on the Zookeeper server until it is deleted. The persistent sequential nodes are roughly the same as the persistent nodes, except that each parent maintains a creation order for its first-level children by adding a numeric suffix to the node name. As for the temporary node, its lifecycle is tied to the client session, so its corresponding SessionID is recorded in the ephemeralOwner parameter of the STAT object, so when the client session fails, the temporary node is automatically cleared and can only be used as a leaf node. That is, it cannot have child ZNodes;

  1. How does Zookeeper use ZNode to coordinate distributed nodes? Take Kafka as an example: Understand Kafka recommend Zhu Zhonghua in-depth understanding of Kafka, here to skip the basic concepts. As we all know, Kafka maintains a cluster of brokers through Zookeeper. The reason why Kafka maintains a cluster through Zookeeper is that Kafka is a distributed messaging middleware with multiple Broker nodes. These nodes may not be located on a single server. So Kafka needs Zookeeper to register each Broker and learn about the data and load status of each Broker for load balancing.

(1) Broker registration. When a new Broker is started, we need to register it with Zookeeper and create a child node under the parent node on the/Broker/IDS path. The path is/Broker/IDS /2. 2 is the ID of the Broker, and then fill in the data field of the newly created ZNode with the IP address and port of the Broker node, and create a temporary node. This node will be automatically deleted when the Broker can no longer keep in contact with Zookeeper. This allows Kafka to obtain the registered brokers in the current cluster from the state of child nodes in the /broker/ IDS path maintained by Zookeeper.

We know that topics in Kafka are an abstract concept and a logical organization of partitioned data. How do we know which brokers these partitions and their copies are distributed among? (ps: Kafka’s partitioned replica mechanism makes the replica exist in a different Broker than the Leader replica), so it is known that the partitions of each topic in Kafka can be distributed across different brokers, so every time Kafka creates a topic, A new child node for the Topic name is created under the/Brokers/Topics path in Zookeeper, and each Broker with that Topic partition registers its BrokerID and number of partitions under that child node. E.g. /brokers/topics/(topic name)login/(BrokerID)3, then 3 the ZNode data field with the number of partitions it has, e.g. 2; Kafka can use this information to locate information about a topic.

(3) Obtain Broker state and load balance Broker cluster

  1. Producer load balancing: This dynamic load balancing needs to be aware of Zookeeper by producersIncrease and decrease of Broker nodes, increase and decrease of topics, and changes in the relationship between Broker and TopicZookeeper provides this information for proper load balancingWatcher mechanism to let producers get dynamically.

2.2 Watcher mechanism (through the concept of throwing Watcher in Kafka above)

2.2.1 Let’s start with a Watcher instance (the instance comes fromFrom Paxos to Zookeeper) :

Public class ZooKeeper_Constructor_Usage_Simple implements Watcher {private static CountDownLatch connectedSemaphore = new CountDownLatch(1); Public static void main(String[] args) throws Exception{// Create a Zookeeper connection, Zookeeper Zookeeper = new Zookeeper ()"127.0.0.1:2181",
        			                       	5000, //
                	new ZooKeeper_Constructor_Usage_Simple());
        System.out.println(zookeeper.getState());
        try {
            connectedSemaphore.await();
        } catch (InterruptedException e) {}
        System.out.println("ZooKeeper session established.");
    }
    public void process(WatchedEvent event) {
        System.out.println("Receive watched event:" + event);
        if(KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); }}}Copy the code

In this code, we first look at the main() method. First, we create a new Zookeeper object instance to try to establish a connection with the Zookeeper server, and the Watcher listens for the WatchedEvent event returned by the Zookeeper server. When the server successfully establishes a connection with the client, a SyncConnected event will be sent to the client, and the client can respond to the event appropriately through the process() method corresponding to Watcher. The same is true for Watcher in Kafka. When the value on the corresponding Zookeeper node changes, an event is generated and sent to the corresponding Watcher for processing.

Watcher public byte[] getData(String path, watcher watcher, Statstat)
Copy the code

We can use this API to dynamically monitor nodes in a path. For example, if we need to monitor the state of the Broker cluster in Kafka, we can register Watcher to the/Broker/IDS path. This way, when the number of children decreases (or increases), The Zookeeper server returns a KeepState:SyncConnected, EventType:NodeDeleted event, which can be processed in Watcher’s process method.

2.2.2 If there are more than one Watcher, how does the client know which Watcher should process the event returned by the server?

The server processes the Watcher (mainly by passing events to the client), and the client calls back the Watcher (mainly by calling the process() method to handle events).

  1. First, when the client registers Watcher, it marks the client’s request to the server as “listening with Watcher”.In addition, a WatchRegistration object is generated locally on the client to store the mapping between the node path of this request and Watcher. If the server responds successfully, the information will be extracted to the client’s local ZKWatcherManger for the client callback stepThe request to the server generates oneMap<String(node path), Set<Watcher>>If there is such a Map, the put operation is performed.
  2. How does the server generate events? For instanceNodeDataChanged eventWhat operations change the ZNode Data field?Server-side setData()Therefore, when the client calls the setData() method of the server and passes in the path of the node to set the value, it will find all listeners registered to this path according to the Map of the server, and generate a NodeDataChanged event, and then the server is responsible for passing this event to the client over the network.
  3. Upon receiving the corresponding event, the client looks at all watchers stored in ZKWatcherManager, places them in a wait queue, and sequentially executes all Watcher process() methods in the wait queue through an infinite loop through the EventThread thread.

Iii. Summary:

  1. Zookeeper maintains a tree structure with different types of ZNodes to maintain multiple processes that need to be coordinated in a distributed environment.
  2. Zookeeper is throughLeader-follower-observerMechanism to maintain a Zookeeper cluster and ZAB protocol to ensure tolerable data consistency and availability of the Zookeeper cluster.