This is my 25th day of the Gwen Challenge

zookeeper

1. They are introduced

1. The concept

ZooKeeper is a distributed, open source distributed application coordination service. It is an open source implementation of Google’s Chubby and an important component of Hadoop and Hbase. It provides consistency services for distributed applications, including configuration and maintenance, domain name service, distributed synchronization, and group service.

ZooKeeper aims to encapsulate key services that are complex and error-prone, and provide users with easy-to-use interfaces, efficient performance, and stable functions.

ZooKeeper contains a simple set of primitives that provide Java and C interfaces.

2. The role

In the previous Redis breakdown, one problem was often solved for multiple concurrency, followed by new problems. Therefore, ZooKeePeer is born to achieve high performance, high reliability, and ordered access. High performance ensures that ZooKeeper can be used in large distributed systems. High reliability ensures that it will not cause any problems due to the failure of a single node. Orderly access ensures that the client can perform complex synchronization operations.

Unlike typical file systems designed for storage, ZooKeeper data is kept in memory, which means ZooKeeper can achieve high throughput and low latency.

3. The characteristics of

  1. Similar to Redis, ZooKeeper itself can also be called replication on hosts in a group of collections.

    Then, according to the previous idea: if there is a host leader, the leader will definitely fail. When the leader fails, the whole cluster will be in a state of no owner before the leader is decided, and all operations will be invalid at this time.

  2. Therefore, ZooKeeper has two states: available (the leader fails) and unavailable (the leader fails). (What is the unusable recovery time?)

    ZooKeeper chooses a new leader in less than 200 milliseconds. Third, as followers recover, ZooKeeper is able to increase throughput again after it starts processing requests.

  3. ZooKeeper primitives.

    1. ZooKeeper marks each update with a number that reflects the order of all ZooKeeper transactions. Subsequent operations can use this command to achieve higher levels of abstraction, such as synchronization primitives.
    2. Primitives: Operating system or computer network language category. It consists of several instructions, which are used to complete a process of a certain function. Indivisible, that is, the execution of primitives must be continuous and cannot be interrupted during execution
  4. They are efficient.

    1. It is particularly fast in “read-first” workloads. The ZooKeeper application runs on thousands of computers and performs best when reads are more common than writes, with a ratio of about 10:1.

    Note: ZooKeeper must not be used as a database.

  5. Zookeeper is a directory tree structure:

    Zookeeper stores all data in memory. The data model is a Znode Tree. The path divided by a slash (/) is a Znode, for example, /foo/path1.

    1. A node can store 1MB of data (please note: The data amount of a node not only contains its own stored data, but also the names of all its child nodes are converted into Byte numbers. Therefore, the number of child nodes of ZNode is not infinite.) When each client connects to a server node, a session session is established:

      1. PERSISTENTPersistent node: A persistent node means that once a Znode is created, it will remain on ZooKeeper until it is removed.
      2. PERSISTENT_SEQUENTIAL– Persistent Sequence Node: After client creates a node and disconnects from ZooKeeper, the node will be persisted. Zookeeper numbers the node in sequence, for example: /lock/0000000001, /lock/0000000002, /lock/0000000003.
      3. EPHEMERAL– Temporary directory node Temporary node: The function of temporary node is that it has a session, similar to redis multi-threaded monitoring, prolong the lock time, that is, the session is in, the lock is in. When the session ends, the temporary node is deleted.
      4. EPHEMERAL_SEQUENTIAL– Temporary sequential node: After the client is disconnected from ZooKeeper and the node is deleted, the node name is sequentially numbered, for example, /lock/0000000001, /lock/0000000002, and /lock/0000000003.

    2. Session Session: Session refers to the Session between the server and the client. In Zookeeper, a client connection refers to the TCP long connection between the client and the server. When a client is started, it first establishes a TCP connection with the server. From the first establishment of this connection, the client life cycle begins. Through this connection, the client can maintain an effective session with the server through heartbeat detection, send requests to the server and receive responses, and also receive Watch event notifications from the server through this connection. If the connection is disconnected due to excessive server pressure, network failure, or client disconnection, the session is still valid as long as the connection can be reconnected to any server in the cluster within the specified sessionTimeout period.

    3. Nodes:

      1. Machine node: Refers to the machines that constitute the cluster, called machine nodes.

      2. Znode: Data unit in the data model

      3. Node attributes:

        The main attributes are zxID, version, and ACL.

    4. Version:

      1. Zookeeper stores data on each Znode. For each Znode, ZooKeeper maintains a data structure called Stat. Three versions of the Znode data are recorded in Stat:
        1. Version (current Znode version);
        2. Cversion (version of the current Znode child)
        3. Uncomfortable (Current VERSION of ACL for Znode (similar to permission control for UNIX file systems)
  6. Features:

    1. Sequential consistency – Updates from clients are applied in the order they are sent.
    2. Atomicity – Update success or failure. No partial results.
    3. Single system image – Clients will see the same service view no matter which server they connect to. That is, even if a client fails over to another server with the same session, the client never sees an older view of the system.
    4. Reliability – After an update is applied, the update will continue until the client overwrites the update.
    5. Timeliness – To ensure that the customer view of the system is up to date within a specific time frame, i.e. to maintain consistency.

4. Simple API

One of ZooKeeper’s design goals is to provide a very simple programming interface. Therefore, it only supports the following operations:

  • create: Creates a node at a location in the tree
  • delete: Delete a node.
  • There are: Tests whether a node exists at a location
  • To get the data: Reads data from a node
  • Set up the data: Writes data to a node
  • For the sonNode: Retrieves a list of children of a node
  • sync: Waiting for data propagation

2. Use and install ZooKeeper

  1. Prepare 4 virtual machines, the previous preparation, will not repeat, then also need to prepare JDK, many online tutorials will not repeat

    Configuration directory, similar to Windows

    # Java EnvironmentExport JAVA_HOME=/root/Java/jdk1.8 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/libCopy the code
  2. Download a zookeeper

    # The number of milliseconds of each tickTickTime =2000 # Heartbeat interval 2s# The number of ticks that the initial
    # synchronization phase can takeInitLimit =10 # Initial delay 2000*10# The number of ticks that can pass between
    # sending a request and getting an acknowledgementSyncLimit =5 #2000*5# the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.DataDir =/var/az/zk# the port at which the clients will connectClientPort =2181 # Connection port number# the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    
    #The ZooKeeper cluster is planned manually
    #3888 This interface provides the following functions: In an ownerless cluster, 3888 is used to establish a connection and select a leader
    #2888 All other nodes connect to 2888 for communication
    #How to select the leader? According to their ID, select the leader with the largest ID. Therefore, it is also said that the leader is modest.
    #Node01 is the hostname of a virtual machine. Node01 is the hostname of a virtual machine. Node01 is the hostname of a virtual machine
    server.1=node01:2888:3888
    server.2=node02:2888:3888
    server.3=node03:2888:3888
    server.4=node04:2888:3888
    
    #The second way to write itServer. 2 = 192.168.40.131:2888-3888 for server 3 = 192.168.40.132:2888-3888 server. 4 = 192.168.40.133:2888-3888 Server. 5 = 192.168.40.134:2888-3888
    
    
    #Create the corresponding directory to store the data in conf (feel free, just know where).Mkdir -p var/az/zk CD var/az/zk vi myid ------ 1 # add id ------
    #Configuring environment Variables
    # zookeeperExport ZOOKEEPER_HOME = / root/zookeeper/apache - they are - 3.5.8 export PATH = $PATH: $ZOOKEEPER_HOME/bin
    #Start the zookeeperZkserver. sh help zkserver. sh start-foreground
    #Three machines will be started in sequence, the final result will be a decision, starting will report an error, it doesn't matter,[myID :3] -info [QuorumPeer[myid=3](plain=[0:0:0:0:0:0:0:0]:2181)(secure=disabled):ZooKeeperServer@166] - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir / root/zookeeper/apache - they are - 3.5.8 - bin/conf/var/az snapdir/zk/version - 2 / root/zookeeper/apache - they are - 3.5.8 - bin/conf/var/az/zk/version - 2 2020-11-19 16:45:05, 151 [3] myid: - the INFO [QuorumPeer[myid=3](plain=[0:0:0:0:0:0:0:0]:2181)(secure=disabled):Follower@69] - FOLLOWING - LEADER ELECTION TOOK - 292 MS 2020-11-19 16:45:06.169 [myID :3] -info [QuorumPeer[myid=3](plain=[0:0:0:0:0:0:0:0]:2181)(secure=disabled):Learner@391] - Getting a diff from the leader 0x0 # Leader selects 2020-11-19 16:45:06.172 [myID :3] -info [QuorumPeer[myid=3](plain=[0:0:0:0:0:0:0:0]:2181)(secure=disabled):Learner@546] - Learner received NEWLEADER message 2020-11-19 16:45:06,206 [myID :3] -info [QuorumPeer[myID =3](plain=[0:0:0:0:0:0:0:0:0]:2181)(Secure =disabled):Learner@529] - Learner received UPTODATE message 2020-11-19 16:45:06.212 [myID :3] - INFO [QuorumPeer[myid=3](plain=[0:0:0:0:0:0:0:0]:2181)(secure=disabled):CommitProcessor@256] - Configuring CommitProcessor with 1 worker threads.Copy the code

Zookeeper loads zoo. CFG file by default, node names such as node01 are not mapped, so node01 is unknown. Later, nodes are directly changed to IP, and the whole environment is built locally

Then start the fourth ZooKeeper and follow directly. No decision is made and follow directly.

3. Parameter description

Zkcli. sh # Connect to ZooKeeper; By default, connect to your own server help # to view parameters## # # # # # # # # # # # #ZooKeeper -server host:port cmd args addauth scheme auth close config [-c] [-w] [-s] connect host:port create [-s] [-e] [-c] [-t ttl] path [data] [acl] delete [-v version] path deleteall path delquota [-n|-b] path get [-s] [-w] path getAcl [-s] path history listquota path ls [-s] [-w] [-R] path ls2 path [watch] printwatches on|off quit reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*] redo cmdno removewatches path [-c|-d|-a] [-l] rmr path  set [-s] [-v version] path data setAcl [-s] [-v version] [-R] path acl setquota -n|-b val path stat [-w] path sync path## # # # # # # # # # # #


create /ooxx ""
create /ooxx/xxoo ""
set /ooxx "hello"
Copy the code

Zk add, delete, and write operations are all handed over to the leader. The leader machine maintains a monotonically increasing counter. You will see a 64-byte transaction ID: cZxid, which is omitted if the first 32 bits are all zeros and used to identify the leader era. If the leader changes, the new leader automatically restarts the transaction ID and changes its prefix.

When we open the connection locally on node5, it is obvious that a session has been established.

2020-11-19 18:45:32.461 [myID :localhost:2181] -info [main-sendThread (localhost:2181):ClientCnxn$SendThread@1394] - The Session establishment complete on server localhost / 127.0.0.1:2181, sessionid = 0 x5000002a4e30001, Negotiated timeout = 30000 CREate-e /xoxo "SDFSDF" # Set a temporary data change manager
#Then create a new node under Node2, establish a connection, and view the dataGet/XOXO # data can be found#Log out of the client on Node5. The data does not exist on Node2
ls
## # # # # # # # # #
[ooxx, zookeeper]
## # # # # # # # # #
Copy the code

Another scenario: when the server to which the client is connected fails and the client transfers the connection, will the temporary node created by the client in the service be lost?

The answer is yes, because after the server goes down, the client transfers the connection, but the temporary node is bound to the sessionID, which tells other services that the temporary node exists, so the temporary node exists. In a nutshell: temporary nodes are bound to session ids, which implement a unified view and consume transaction ids.

How to avoid the write overwrite problem when multiple CLI concurrently write data:

When multiple writes: create using create-s so that each creation is independent and does not cause overwrite problems

#2 create[zk: localhost:2181(CONNECTED) 1] create-s/ABC "SDF" Created /abc0000000004 ##Node5 create
[zk: localhost:2181(CONNECTED) 2] create -s /abc "sdf"
Created /abc0000000006
Copy the code

4. Application:

  1. Unified Configuration management: -1M data, including the ID and PATH structure

  2. Group management: -path structure, group naming

  3. Unified name: sequential

  4. Synchronization: – Temporary nodes

  5. Application Scenarios:

    1. (How do you design a distributed lock?)
      1. Setting up temporary nodes
      2. Depending on a parent node, which has -s, represents how many locks can be under the parent node
      3. Locking of queued things.
    2. Distributed queue
    3. Distributed configuration Center

4. Node communication

  1. Check node connections

    / root @ node5 ~ # netstat natp | egrep '(2888 | 3888) TCP 0:0: FFFF: 192.168.40.134:2888: : : * LISTEN/Java TCP 1452 0 0 : : FFFF: 192.168.40.134:3888: : : * LISTEN/Java TCP 1452 0 0: : FFFF: 192.168.40.134:37978: : FFFF: 192.168.40.133:3888 ESTABLISHED 1452 / Java TCP 0:0: FFFF: 192.168.40.134:2888: : FFFF: 192.168.40.133:46868 ESTABLISHED 1452 / Java TCP 0 0 : : FFFF: 192.168.40.134:35986: : FFFF: 192.168.40.131:3888 ESTABLISHED 1452 / Java TCP 0:0: FFFF: 192.168.40.134:2888 : : FFFF: 192.168.40.132:59710 ESTABLISHED 1452 / Java TCP 0:0: FFFF: 192.168.40.134:2888: : FFFF: 192.168.40.131:40444 ESTABLISHED 1452 / Java TCP 0:0: FFFF: 192.168.40.134:52642: : FFFF: 192.168.40.132:3888 ESTABLISHED 1452 / JavaCopy the code

5. Zookeeper principles and knowledge

  1. Subversion: The leader, Follower and Observer roles are used instead of the original master/slave concept.

    1. The Leader server provides read and write services for clients.

    2. Both followers and Observers provide read services. The difference between the follower and observer is that they do not participate in the leader election process or write operations.

    3. Observe improves the read performance of a cluster without affecting the write performance.

1. Paxos — The soul of ZooKeeper

  1. Explain from: www.douban.com/note/208430…

  2. Originally planned to introduce ZK Server after introducing ZK Client, but found that ZK Server contains too much content, not a simple single Blog can be done. It was better to start with the basics.

    So what are the basics of ZK Server? I think it’s Paxos. So this article introduces Paxos and its corresponding implementation in ZK Server.

    First, Paxos is a consistency algorithm based on messaging. Proposed by Leslie Lamport in 1990, Paxos has been widely used in distributed computing in recent years. Google’s Chubby and Apache’s Zookeeper are all implemented based on its theory. Paxos is also considered to be the only distributed consistency algorithm so far, and the other algorithms are improvements or simplifications of Paxos. One problem to mention is that Paxos has a premise: there is no Byzantine general problem. This means that Paxos only works in a trusted computing environment that can’t be corrupted by an intrusion.

    A detailed description of Paxos can be found in the Wiki: zh.wikipedia.org/zh-cn/Paxos… Mapping between servers.

    Paxos describes a situation where there is an Island called Paxos where there is a group of people, and everything on the Island is decided by a special group of people, senators. The Senator Count is fixed and cannot be changed. Each change in environmental affairs on the island requires the adoption of a Proposal, and each Proposal has a PID number, which is always increasing and cannot be reversed. Every motion requires a majority of Senators to take effect. Each member will only agree to proposals greater than their current number, both those in force and those not in force. If an MP receives a proposal of less than or equal to the current number, he will reject it and tell the other party: your proposal has already been made. The current number here is the number that each senator keeps in his notebook, and he keeps updating it. There is no guarantee that the number on all members’ notebooks will always be the same throughout parliament. Now parliament has one goal: to make sure that all members agree on the proposals.

    All right, now that parliament is in operation, all members start their notebooks with the number zero on them. One lawmaker sent a proposal to set the electricity rate at 1 yuan per kilowatt hour. First he looked in his notebook and said, well, the current motion number is 0, so my motion number is 1, so he sent a message to all the senators: Motion number 1, set the electricity rate at 1 yuan per kilowatt hour. The other member received the message and checked his notepad, oh, the current motion number is 0, the motion is acceptable, so he wrote down the motion and replied: I accept your motion number 1, and he wrote down in his notepad: the current motion number is 1. The member who initiated the motion received more than half of the responses and immediately sent a notice to everyone: Motion 1 is effective! The Congressman who received the bill would modify his notepad, changing a good proposal from a record to a formal law. When someone asked him how much the electricity bill was, he would check the law and tell the other party: 1 yuan/KWH.

    Now look at the resolution of the conflict: suppose there are three MPS s1-S3, and BOTH S1 and S2 propose a motion at the same time: Motion 1, setting electricity charges. S1 wants to be 1 yuan per degree, S2 wants to be 2 yuan per degree. S3 receives S1’s proposal first, so he does the same thing as before. He then receives an offer from S2, and when he looks in his notebook, he says, gee, this offer is less than or equal to my current number 1, so he rejects the offer: Sorry, that offer was made earlier. S2’s offer was rejected, and S1 issued a formal offer: Proposition 1 is valid. S2 asks S1 or S3 and updates the contents of Decree 1, and then he can choose to proceed with proposition 2.

    Okay, I think that’s the best part of Paxos. Now let’s see how Paxos is implemented in ZK Server.

    Island — ZK Server Cluster: Cluster

    Senator — ZK Server: Service node

    Proposal (Proposal) – ZNode Change (Create/Delete/SetData…).

    Offer Id (PID) — Zxid(ZooKeeper Transaction Id)

    Official decree – All ZNodes and their data

    It seems that all the key concepts correspond to each other, but wait a minute, the lawmakers of Paxos island are supposed to be equal, and it seems that ZK Server has a concept of Leader. Yes, in fact, the concept of Leader should also belong to the Paxos category. If parliamentarians are all equal, at some point there will be a “live lock” due to the proposed conflict. Lamport, The author of Paxos, elaborated on this problem and proposed a solution in his article “The Part-time Parliament” — set up a president among all members of Parliament, and only The president has The right to make proposals. If members of Parliament have their own proposals, they must be sent to The president and put forward by The president. Ok, we have one more character: president.

    President – ZK Server Leader

    Another question arises, how is the president chosen? oh, my god! It’s a long story. In taobao core system team Blog above there is an article is to introduce how to elect the president, interested can go to see: rdc.taobao.com/blog/cs/?p=…

    Now let’s assume that the president has been chosen. Let’s see how ZK Server is implemented.

    Situation 1 :(enquiry)

    When Client goes to a ZK Server to ask about a certain law (ZNode data), the legislator takes out his notebook (Local storage) without hesitation, looks up the law and tells him the result, while stating: my data may not be the latest. You want the latest figures? No problem. Hold on. I’ll get back to you when I Sync with the president.

    Case 2 :(deleted)

    Fart minb (Client) to a member (ZK Server) there to ask the government to return the ten thousand yuan owed to him, the Congressman asked him to wait in the office, he will reflect the problem to the president, the president asked all the members of the opinion, most of the members said that the money owed to the fart people must be returned, so the president issued a statement, from the state Treasury to take out ten thousand yuan to repay, The Treasury’s total assets went from $1 million to $990,000. Fart b gets the money back (Client function returns).

    Two stages: members report to leader; The leader then consults with the councillor to determine the outcome

    Case 3 :(server down)

    The president suddenly died, lawmakers one after another found that the president could not be contacted, so they issued a statement, elected a new president, the presidential election during the government shutdown, refused to fart people’s request.

2. Two-phase commit

Generally, two-phase commit protocol is considered as a consistency protocol to ensure data consistency in distributed systems. At present, most relational databases use two-phase commit protocol to complete distributed transaction processing.

As the name implies, the two-phase commit protocol divides the process of submitting a transaction into two phases.

The following is the processing logic for the two phases of the two-phase commit process. To put it simply, two-stage submission divides the process of an item into two stages: voting and execution. The core of the two-stage submission is the trial-and-commit approach for each item. Therefore, two-phase commit can also be regarded as a strongly consistent algorithm.

Phase one: Submit the transaction request

Phase two: Perform the transaction commit

Schematic diagram of the transaction commit process:

The condition in which a transaction is interrupted

Schematic diagram of transaction interrupt process:

3. ZAB

  1. ZAB (Zookeper Atomic Broadcast Atomic Broadcast) is a crash recovery Atomic Broadcast protocol specially designed for Distributed coordination service ZooKeeper. Zookeeper mainly relies on ZAB to implement distributed data consistency. Based on this protocol, Zookeeper implements a system architecture in active/standby mode to ensure data consistency among replicas in the cluster.

  2. ZAB protocol includes two basic modes:

    1. Crash recovery: When the whole service framework is in the startup process, or the leader server has network interruption, crash, exit and restart, ZAB protocol will enter the recovery mode and elect a new Leader server. When a new Leader server is elected, At the same time, ZAB protocol will exit the recovery mode after more than half of the machines in the cluster have completed state synchronization with the Leader server. The message broadcast mode starts.
    2. Message broadcast: After the leader election is successful and more than half of the machines have completed data synchronization with the Leader, the entire service framework can enter the message broadcast mode. When a server that also complies with ZAB protocol is started and joins the cluster, if a leader is in charge of message broadcast in the cluster, the newly added server will consciously enter the data recovery mode:
      1. Find the Leader server for data synchronization;
      2. Participate in the broadcast process;

    Diagram of zAB protocol

    If the cluster already has the leader, the client initiates a connection and connects to the follower. After the connection is established,

    1. Create a node create /ooxx.

    2. After the command is received, it is forwarded to the leader to decide whether to create the command.

    3. The leader assigns (automatically created) zxID: 8 to the transaction and maintains a transaction queue;

    4. The leader initiates a notification (broadcast feature) that tells the slave to create this message:

      1. And send the ok
      2. The followers make a decision and send ok to the leader. If more than half of the decisions are made, the nodes are successfully created
    5. Write memory;

    6. Ultimately, data consistency is maintained.

    7. If the write is successful, the follower sends ok

    8. The final result is returned to the client

    Analysis of the cluster built above:

    First scenario: leader election;

    In the second scenario, as shown in the figure above, the leader hangs. When node03 M2 Z is not synchronized, node03 has found that the leader is dead. The other two have been synchronized, but the leader is not dead.

    1. Node03 +1 votes for itself; Node01 will compare (M3 Z7) with its own version of the thing and find that Z7 is lower than Z8 and reject this vote,

      1. With this memory initiate their own vote node01+1(vote yourself), in the form of a broadcast;

      2. Node02 does the same and votes for node02+1; It also sends its version of things to other nodes;

      3. Node03 compares the data received from Node01 and Node02, and finally votes for Node02 +1. At this point, node03 analysis is complete

        Node01 and node02 are blocked at the same time or are they asynchronous. If they are asynchronous, there is one case

        1. Node01 is rejected and node02 is rejected (M2>M1). Because node03 has already triggered node02 to vote.
          1. Node03 receives node01 and compares it, or does it wait for node02 to compare it again, i.e. blocking (dead)? (I don’t think that’s the way to go.)
          2. Or after comparing node01, vote for Node01 first, compare the data for Node02, and vote for Node02?
        2. Or Node02 can vote on rejection, and node01 can vote on rejection,
    2. My understanding according to the video is as follows:

      1. If node03 triggers node02, node01 triggers node02, and node02 triggers node02, then node03 triggers node02 and node01 triggers node02, then node03 triggers node02 and node01 triggers node02

      2. Node01 will vote for itself after rejection, node03 will vote for it after rejection, that is, node01+2. Node02 will vote for itself after rejection. Node01 will vote for Node02.

      3. Whoever triggers the vote for Node02 will eventually receive one vote for Node03 and one vote for Node01 and become the final leader.

        (If that friend is familiar with the subject and can explain it a little bit, that’s all I can take.)

If the election fails, the whole ZooKeeper is unavailable (two).

4. Watcher — to monitor and observe

Unified view: Directory tree structure

Let’s say we have two clients that want to know whether the other has hung or not using ZooKeeper. There are several ways. The difference lies in directivity and timeliness.

  • Sending heartbeats to each other, implemented by the developers themselves
  • Watcher, based on ZooKeeper./foo/barAn event is generated when a node disappears. After the event is generated, the callback method defined previously is called back. The callback is based on the registration. We will demonstrate this later in the API.

In ZooKeeper, Watcher mechanism is introduced to realize the function of distributed notification.

ZooKeeper allows a client to register a Watcher listener with the server. When the Watcher is triggered by some specific event on the server, an event notification is sent to the specified client for distributed notification.

1. What is watcher? (fromBlog.csdn.net/hollis_chua…

Watcher is a very core function of zooKeeper. The client watcher can monitor the data changes of the node and its child nodes. Once the status changes, the zooKeeper server will notify all clients that have set Watcher on this node. As a result, each client will quickly sense that the state of the node it is listening to changes, and make the corresponding logical processing.

With a brief introduction to Watcher, let’s take a look at how ZooKeeper implements service registration and discovery. Zookeeper service registration and discovery mainly applies zooKeeper’s ZNode data model and Watcher mechanism. The general process is as follows:

  • Service Registration:Service provider (Provider) at startup, theZookeeper serverRegister service information, that is, create a node, for example: user registration servicecom.xxx.user.registerAnd stores service data (such as the IP address and port of the service provider) on the node.
  • Service discovery:Service Consumers (Consumer) During startup, the system sends a message to theZookeeper serverGets registered service information and setsWatch to monitorAfter obtaining the registered service information, the service provider’s information is cached locally and the service is invoked.
  • Service Notice:Once the service provider is down for some reason and no longer provides the service, the client andzookeeperThe server is disconnected.zookeeperThe service node corresponding to the service provider on the server is deleted (for example, user registration service)com.xxx.user.register), thenzookeeperThe server asynchronously registers the service with all consumer userscom.xxx.user.registerAnd is setWatch to monitorAccording to the notification received, the consumer pulls the latest service list and updates the locally cached service list.

1. The test

Idea connection to ZooKeeper is very slow. I don’t know why. Changed version 3.4.9, problem solved, 25ms

package com.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/** * Hello world! * * /
public class App 
{
    public static void main( String[] args ) throws Exception
    {
        System.out.println( "Hello World!" );


        // ZK has a session concept, not a connection pool concept
        // There are two types of watch: zK is created with parameters, not related to path and node.
        // Watch registration only happens on read calls, get exites...

        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final ZooKeeper zk = new ZooKeeper("192.168.40.131:2181192168 40.132:2181192168 40.133:2181192168 40.134:2181".3000.new Watcher() {
                    // Watch callback method
                    @Override
                    public void process(WatchedEvent event) {
                        Event.KeeperState state = event.getState();
                        Event.EventType type = event.getType();
                        String path = event.getPath();
                        System.out.println("new zk watch:"+event.toString());

                        switch (state) {
                            case Unknown:
                                break;
                            case Disconnected:
                                break;
                            case NoSyncConnected:
                                break;
                            case SyncConnected:
                                System.out.println("connected");
                                countDownLatch.countDown();
                                System.out.println("end Time : "+System.currentTimeMillis());
                                break;
                            case AuthFailed:
                                break;
                            case ConnectedReadOnly:
                                break;
                            case SaslAuthenticated:
                                break;
                            case Expired:
                                break;
                            case Closed:
                                break;
                        }

                        switch (type) {
                            case None:
                                break;
                            case NodeCreated:
                                break;
                            case NodeDeleted:
                                break;
                            case NodeDataChanged:
                                break;
                            case NodeChildrenChanged:
                                break;
                            case DataWatchRemoved:
                                break;
                            case ChildWatchRemoved:
                                break; }}});// For zooKeeper, the connection is asynchronous, so when the state is acquired, it is out of the connection
        // What about asynchronous synchronization? countDownLatch
        try {
            System.out.println("start Time : "+System.currentTimeMillis());
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ZooKeeper.States states = zk.getState();
        switch (states) {
            case CONNECTING:
                System.out.println("ing ……");
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                System.out.println("ed ……");
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }


        // Create a node in two ways:
        try {
            String pathName = zk.create("/ooxx"."olddata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        final Stat stat = new Stat();

        byte[] node = zk.getData("/ooxx".new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("getData watch" + event.toString());
                // Continue monitoring after the value is set
                try {
                    //true the default Watch is re-registered as the new ZK one
                    zk.getData("/ooxx".this,stat);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, stat);

        System.out.println(new String(node));

        // Trigger the callback
        Stat stat1 = zk.setData("/ooxx"."newData".getBytes(), 0);
        // Whether this will trigger, the second version will fetch the first
        Stat stat2 = zk.setData("/ooxx"."newData01".getBytes(), stat1.getVersion());

        System.out.println("-------------async start-------------");
        zk.getData("/ooxx".false.new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
                System.out.println("-------------async start-------------");
                System.out.println(newString(bytes)); }},"abc");
        System.out.println("-------------async call-------------");

        Thread.sleep(2222222); }}Copy the code

Distributed coordination — Configuration

ZooKeeper does distributed coordination, so what does it coordinate? Where is the configuration write? Do operations have to log on to each machine and change it one by one? Configuration files can be placed in a shared location, such as Redis, such as a database, such as ZK, anywhere. Zk has a callback mechanism that eliminates the need for polling. Distributed lock implementation, this ZK can also do, interview often ask, although may not use, but this problem can take a lot of knowledge points.

1. Use ZooKeeper to implement the distributed configuration center

Testconfig.java tests the code

package com.config;


import com.utils.MyConf;
import com.utils.WatchCallBack;
import com.utils.ZKUtils;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestConfig {
    ZooKeeper zk;

    @Before
    /** * connect, write dead **/
    public void conn(a) {
        zk = ZKUtils.getZK();
    }

    @After
    public void close(a) {
        try {
            zk.close();
        } catch(InterruptedException e) { e.printStackTrace(); }}@Test
    public void getConf(a) throws Exception{
        / / will be
        WatchCallBack watchCallBack = new WatchCallBack();
        watchCallBack.setZk(zk);
        MyConf myConf = new MyConf();
        watchCallBack.setConf(myConf);

        watchCallBack.aWait();
        //1, the node does not exist
        //2, the node exists
        while (true) {
            if (myConf.getConf().equals("")) {
                System.out.println("conf diu le ......");
                watchCallBack.aWait();
            } else {
                System.out.println("In getConf Test, myConf is:" + myConf.getConf());
            }

            try {
                Thread.sleep(1000);
            } catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code

Zkutils. Java connection configuration code to configure the service IP port and return the ZooKeeper instance

package com.utils;

import org.apache.zookeeper.ZooKeeper;


import java.util.concurrent.CountDownLatch;

public class ZKUtils {

    private static ZooKeeper zk;

    Create /testLock create /testLock/AppConf set /testLock/AppConf "hello,conf"
    // IP: /testLock after port is the specified root directory. All subsequent operations are performed based on /testLock
    private static String address = "192.168.40.131:2181192168 40.132:2181192168 40.133:2181192168 40.134:2181 / testConf";

    private static DefaultWatch watch = new DefaultWatch();

    private static CountDownLatch init = new CountDownLatch(1);

    public static ZooKeeper getZK(a) {
        try {
            zk = new ZooKeeper(address, 1000, watch);// The new ZooKeeper object needs to use DefaultWatch
            watch.setLatch(init);
            init.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        returnzk; }}Copy the code

Watchcallback.java, which implements multiple interfaces that are both Watcher and Callback. Rewrites each callback function, which is the core of Reactor model

package com.utils;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class WatchCallBack implements Watcher.AsyncCallback.StatCallback.AsyncCallback.DataCallback {

    ZooKeeper zk;
    MyConf conf;
    CountDownLatch latch = new CountDownLatch(1);

    public WatchCallBack(a) {}public ZooKeeper getZk(a) {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }

    public MyConf getConf(a) {
        return conf;
    }

    public void setConf(MyConf conf) {
        this.conf = conf;
    }



    public void aWait(a) throws InterruptedException {
        zk.exists("/AppConf".this.this."ABC");
        // If the node does not exist, it will not block forever, because the event is registered, and the thread will be unlocked after the node is created by other threads
        latch.await();
        System.out.println("awaitExists finish ......");
    }

    // Register whether the node exists
    @Override
    /** * DataCallback interface implementation */
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        if(data ! =null) {
            String str = new String(data);
            System.out.println("In WatchCallBack, data is: "+ str); conf.setConf(str); latch.countDown(); }}// Data acquisition
    @Override
    /** * The StatCallback interface implements */
    public void processResult(int i, String s, Object o, Stat stat) {
        if(stat ! =null){
            zk.getData("/AppConf".this.this."sdfs"); }}// Event monitoring
    @Override
    /** ** Watcher interface implementation */
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                zk.getData("/AppConf".this.this."sdfs");
                break;
            case NodeDeleted:
                / / tolerance
                conf.setConf("");
                latch = new CountDownLatch(1);
                break;
            case NodeDataChanged:
                zk.getData("/AppConf".this.this."sdfs");
                break;
            case NodeChildrenChanged:
                break; }}}Copy the code

MyConf.java

package com.utils;


// This class is used to store configuration data in the future. In real development, the configuration might be a complex JSON, XML, and so on
public class MyConf {
    private String conf;

    public String getConf(a) {
        return conf;
    }

    public void setConf(String conf) {
        System.out.println("MyConf.conf is set to: " + conf);
        this.conf = conf; }}Copy the code

Defaultwatch.java, which implements the default monitor class, is passed in as a parameter to the new Zookeeper class

package com.utils;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;

public class DefaultWatch implements Watcher {

    CountDownLatch latch;

    public void setLatch(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void process(WatchedEvent event) {

        System.out.println(event.toString());

        switch (event.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                System.out.println("In DefaultWatch, SyncConnected, connected successfully.");
                latch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break; }}}Copy the code

During the run, the test changes node data and deletes nodes.

2. Distributed lock based on Zookeeper

  1. For lock:

    1. Ensure uniqueness, i.e. only one can acquire the lock;
  2. After obtaining the lock: the following problems occur: the task is not completed, hangs, deadlocks. (Temporary node session)

  3. Release the lock

  4. How do I notify other threads when the lock is deleted

    1. Active polling, heartbeat: Drawbacks: stress, delay
    2. Watch: Deal with delays, drawbacks: Stress
    3. Sequence +watch: Watch who? Only the first person can get the watch. Once the minimum lock is released, ZK only gives the second event a callback.

    The zooKeeper registration code is the same as above, you can change the test directory /testLock

    Test case testlock. Java

    package com.lock;
    
    import com.utils.ZKUtils;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    public class TestLock {
    
        ZooKeeper zk;
    
    
        @Before
        public void conn(a) {
            zk = ZKUtils.getZK();
        }
    
        @After
        public void close(a) {
            try {
                zk.close();
            } catch(InterruptedException e) { e.printStackTrace(); }}@Test
        public void lock(a){
    
    
            // 10 threads does not mean that 10 threads are created internally, but an external use may represent 10 machines, regardless of the IMPLEMENTATION of the JVM
            The thread status has been moved to the external ZooKeeper, depending on the external state.
            for (int i = 0; i <10 ; i++) {
    
                new Thread(()->{
    
                    // This object may appear on up to 10 machines
                    WatchCallBack watch = new WatchCallBack();
                    watch.setZk(zk);
                    String threadName = Thread.currentThread().getName();
                    watch.setThreadName(threadName);
                    // Each thread grabs the lock
                    watch.tryLock();
    
    
    
                    / / work
                    System.out.println("Work...");
    
                    If the first lock has been released, the next lock has not started to watch.
                    // If the first lock release message is not seen, the fault is broken, causing other threads to block
                    // You can also set the data to notify when the lock is released.
     /* try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } * /
                    / / releases the lock
                    watch.unLock();
    
    
                }).start();
            }
    
    
            while (true) {}}}Copy the code

    WatchCallBack.java

    package com.lock;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class WatchCallBack implements Watcher.AsyncCallback.StringCallback.AsyncCallback.Children2Callback.AsyncCallback.StatCallback {
    
        ZooKeeper zk;
        String threadName;
        String pathName;
    
        CountDownLatch latch = new CountDownLatch(1);
    
        public ZooKeeper getZk(a) {
            return zk;
        }
    
        public void setZk(ZooKeeper zk) {
            this.zk = zk;
        }
    
    
        public String getThreadName(a) {
            return threadName;
        }
    
        public void setThreadName(String threadName) {
            this.threadName = threadName;
        }
    
    
        public String getPathName(a) {
            return pathName;
        }
    
        public void setPathName(String pathName) {
            this.pathName = pathName;
        }
    
        public void tryLock(a){
            try {
                System.out.println(threadName+" create ----------");
                //if(zk.getData("")) reentrant lock
                zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL,this."abc" );
                latch.await();
            } catch(InterruptedException e) { e.printStackTrace(); }}public void unLock(a){
            try {
                zk.delete(pathName, -1);
                System.out.println(threadName+" over work----");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch(KeeperException e) { e.printStackTrace(); }}@Override
        /*
        * Watcher接口
        * */
        public void process(WatchedEvent event) {
            // If the first guy's lock is released, only the second guy receives the callback event
            // If it is not the first guy, one of them can also cause B to receive this notification, which will put it behind the other machine C
            // Go to watch A hang up the machine in front of this guy
            switch (event.getType()) {
                case None:
                    break;
                case NodeCreated:
                    break;
                case NodeDeleted:
                    zk.getChildren("/".false.this."sdf");
    
                    break;
                case NodeDataChanged:
                    break;
                case NodeChildrenChanged:
                    break; }}@Override
        /* * String CallBack * */
        public void processResult(int rc, String path, Object ctx, String name) {
            if(name ! =null){
                System.out.println(threadName+" create node : "+name);
                pathName = name;
                zk.getChildren("/".false.this."sdf"); }}@Override
        /* * StatCallback interface * * */
        public void processResult(int i, String s, Object o, Stat stat) {}@Override
        /* * Children2Callback * */
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
            // To enter this method, you must see the previous node
    /* System.out.println(threadName+"lock locks----------"); for (String child : children) { System.out.println(child); } * /
    
            Collections.sort(children);
            int index = children.indexOf(pathName.substring(1));
    
    
            // Is it the first one to happen
            if (index==0) {//yes
                System.out.println(threadName+"i am first....");
                try {
                    // Remove delay locking between threads
                    zk.setData("/",threadName.getBytes(),-1);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            }else{
                //no
                // You need to check the status, because you don't know if the previous node is down
                zk.exists("/"+children.get(index-1), this.this."sdf"); }}}Copy the code