This section describes important concepts of ZooKeeper

ZooKeeper is a software project of the Apache Software Foundation. It provides open source distributed configuration services, synchronization services, and name registration for large-scale distributed computing. ZooKeeper is a typical distributed data consistency solution.

Objective: ZooKeeper is dedicated to providing distributed coordination services with high performance, high availability, and strict sequential access control capabilities for distributed applications.

  1. High performance: Stores all data in memory and directly serves all non-transaction requests of the client. This mode is especially applicable to read application scenarios, but is not suitable for transaction request operations.
  2. High availability: Generally, services are provided in a cluster. As long as more than a few machines in the cluster can work properly, the entire cluster can work properly.
  3. Strict access order: For each transaction request from the client,zkThey’re all assigned oneGlobal transaction ID: zxID, which reflects the sequence of all transaction operations.

Zk Cluster role

  1. Leader: The core of the cluster work, the unique scheduler and handler of transaction requests, ensuring the sequence of transaction processing. All write requests must be forwarded toLeaderTo deal with.LeaderYou need to determine the number to perform the operation.
  2. Follower: Processes client non-transaction requests and forwards transaction requests toLeader, to participate inLeaderThe election.
  3. Observer The observer: handles the independent processing of non-transaction requests, and for transaction requests, also forward toLeaderThe server processes and does not participateleaderDo not participate in the election2PCData synchronization process.

2PC Data synchronization Process For details, see message Broadcast in the Data synchronization Process section of the ZK

Zk node features

Same-level nodekeyName the only

If an existing node is created again, a message is displayed indicating that the node already exists.

When creating a node, you must specify the full path of the node

$ ls /runoob
$ create /runoob/child 0
$ create /runoob/child/ch01 0
Copy the code

Temporary nodes insessionIt’s cleared when it’s closed

Automatically number sequential nodes

#Creating the sequence node create-s adds a sequence number to the end of the directory
$ create /aha

Created /aha0000000003
Copy the code

Provides the nodewatchmechanism

Refer to the following section: demonstrate zkClient’s listening events

deleteCommands can only be deleted layer by layer

An error is reported when the delete command is used to delete a node with child nodes.

This can be done using the deleteall command

Zk permission control

Zk Access Control lists (ACLs) are very important in production environments.

The composition of the ACL

Zk acLs define permission lists with [Scheme: ID: Permissions]

  1. Scheme: indicates a permission mechanism, including world, Auth, Digest, and IP.

    • world: The default mode, which means that the whole world can access
    • auth: indicates an authenticated user
    • digest: The user name and password are used for authentication
    • ipUse:ipAddress mode for authentication can be specifiedipOr address segment
  2. Id: indicates the user that is allowed to access.

  3. Permissions: Permission combination Character string, consisting of cDRWA, where each letter stands for different permissions, such as create permission CREATE (c), delete permission delete(d), read permission read(r), write permission write(w), admin(a), The administrator has permission to perform ACL operations. D is the permission to perform operations on child nodes, and other operations are the permission to perform operations on its own nodes.

usedigestMode for permission control

Write a utility class to generate the username and password needed to create a digest mode user

Importing Maven dependencies

<! Configure the Zookeeper client dependency to be the same as the Zookeeper Server version. Configure the Zookeeper client dependency to be the same as the Zookeeper Server version
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.6.3</version>
  <exclusions>
    <! The zK package uses the log4j log, which conflicts with the SpringBoot logback log
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>
Copy the code

Write utility classes

package com.aha.utils;

import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

import java.security.NoSuchAlgorithmException;

/** * ZK tool class **@author: WT
 * @date: 2021/12/1 blessed * /
@Slf4j
public class ZkUtils {

    /** * Generates an encrypted string * for the digest username and password@paramIdPassword Indicates the plaintext * of the user name and password@return
     * @throws NoSuchAlgorithmException
     */
    public static String getDigestAuthStr (String idPassword) throws NoSuchAlgorithmException {
        return DigestAuthenticationProvider.generateDigest(idPassword);
    }

    public static void main(String[] args) throws NoSuchAlgorithmException {
        // user:svY1VPBeOtL797NihAvQMUtFLHs=
        log.info(ZkUtils.getDigestAuthStr("user:111111")); }}Copy the code

Set the corresponding node based on the generated encryption permission string

#Create an AHA node and set user:111111 to have read and write permissions and acl permissions
$ create /aha aha-data digest:user:svY1VPBeOtL797NihAvQMUtFLHs=:rwda

#View the/AHA node when no user login is performed after the command is executed
$ ls /aha

#Insufficient return permission
[zk: localhost:2181(CONNECTED) 2] get /aha
Insufficient permission : /aha

#Log in the user and then view the corresponding node
$ addauth digest user:111111

$ get /aha

#You can view it normally
[zk: localhost:2181(CONNECTED) 7] get /aha
aha-data
Copy the code

Guide to pit mining:

#Create an AHA node and set user:111111 to have read and write permissions and acl permissions
$ create /aha aha-data digest:user:svY1VPBeOtL797NihAvQMUtFLHs=:rwda
Copy the code

When executing this command, be sure to set the value for the node, because the create command looks like this: Create [-s] [-e] [-c] [-t TTL] path [data] [acl] So if you want to add acLs without adding data, the STRING of ACLs will be set to the node as data, and acLs will not be set.

useauthMode Indicates permission setting

The differences between auth mode and Digest mode are as follows:

  1. In the use ofauthMode Before setting the permission of a node, create a useraddauth digest user:pwd
  2. When setting permissions for nodes, you only need to set the user name, as shown in the following example
#Add user
$ addauth digest aha:111

#Add a node and set permissions
$ create /auth auth-data auth:aha:cdwra

#If you want to operate /auth after connecting to other clients or reconnecting to this client, you need to log in as the user first, otherwise you will report insufficient permissions
$ addauth digest aha:111
#And then you're ready to do it
Copy the code

useipMode Indicates permission setting

#Specify that 10.211.55.4 has cDRWA permission
$The create/IP IP - date IP: 10.211.55.4: cdrwa

#None 10.211.55.4 Performing operations on node/IP
$ get /ip

#Indicating insufficient permissions
[zk: localhost:2181(CONNECTED) 0] ls /ip 
Insufficient permission : /ip

#Use 10.211.55.4 to perform operations on node/IP
$ get /ip

#Can return normally

#Using multiple IP addresses
$SetAcl/IP IP: 10.211.55.4: cdwra, IP: 10.211.55.5: cdwra

#You can also use the IP address segment
$SetAcl/IP IP: 10.211.55.4/24: cdwra
Copy the code

Zk read/write mechanism

Zk uses a single master process, the Leader, to receive and process all transaction requests from the client. Note that this is a transaction request. Non-transaction requests can be handled directly by followers or observers.

Transaction requests are requests that involve data modification: add, modify, delete

Non-transactional requests are: query operations

When a non-leader node receives a transaction, it forwards the request to the Leader node for processing.

Zk data synchronization process

Zk mainly relies on ZAB (Zookeeper Atomic Broadcas) Atomic message broadcast protocol to achieve the consistency of distributed data.

ZAB protocol, similar to Paxos, is also a data consistency algorithm, which includes two parts: message broadcast and crash recovery.

News broadcast

After receiving the transaction request forwarded by the client, observer, or follower node, the leader node translates the request into a Proposal: (Proposed) Broadcast to all the follower nodes, except observer nodes. If half of the followers in the cluster give correct ACK feedback to the leader node, At this point, the leader node sends a COMMIT message to all the followers to submit the proposal. This process can be called a 2PC transaction commit. As shown below:

Note: The Observer node is only responsible for synchronizing the leader data and does not participate in the 2PC process.

ACK: Acknowledge character Acknowledge character

Crash recovery

It can run well under normal condition of message broadcast, but once the Leader server crashes or loses communication with half of followers due to network principles, it will enter the crash recovery mode and need to elect a new Leader server. During this process, two data inconsistency risks may occur, which need to be avoided by the ZAB protocol.

The recovery mode of THE ZAB protocol uses the following policies:

  1. The electionzxidThe largest node as the newleader
  2. newleaderUncommitted messages in the transaction log are processed

Zk leader election

Introduction to Important Parameters

  1. The serverID(myid): The larger the number, the greater the weight in the election algorithm
  2. Global transactionID(zxid): The greater the value, the newer the data and the greater the weight
  3. The logical clock(epoch-logicalclock): The logical clock value is the same during the voting round. The value increases after each voting to verify whether the voting round is the same

Election Status

  1. LOOKING: Campaign status
  2. FOLLOWING: Follower status, synchronizationleaderStatus, participate in the vote
  3. OBSERVING: Observe the status and synchronize the dataleaderStatus, do not participate in the vote
  4. LEADING: Leader status

Introduction to election Process

The algorithm used for the election is the PaxOS algorithm.

Each node is started in a look-and-see state, followed by the election process. This section takes a cluster of three machines as an example. When the first server, Server1, is started, the leader election cannot take place. When the second server, Server2, is started, the two servers can communicate with each other and enter the leader election process.

  1. Send a vote: Each server issues a vote. Since it is the initial situation, server1 and server2 will vote themselves as the leader server. Each vote contains the selected server myID, ZXID and epoch, and servern(myID, Zxid), server1 votes for (1,0), server2 votes for (2,0), and sends their votes to other machines in the cluster.

  2. Check the vote: after each server in the cluster receives the vote, it first checks the validity of the vote, such as whether it is the epoch of this round and whether it is from a server in the LOOKING state.

  3. Vote comparison: For each vote, the server needs to compare the vote of other servers with its own vote. The comparison rules are as follows:

    • Priority to compareepoch
    • checkzxid.zxidLarger servers are preferredleader
    • ifzxidSame, then comparemyid.myidLarger servers asleaderThe server
  4. Statistical voting: after each vote, the server counts the voting information and determines that more than half of the machines have received the same voting information. Both server1 and server2 count that two machines in the cluster accept the voting information of (2,0), and server2 is selected as the leader node.

  5. Change the server state. Once the leader is determined, each server responds by updating its state, changing it to FOLLOWING if it is follower, or to LEADING if it is leader. Server3 continues to start and changes itself to FOLLOWING.

The installation of a Zookeeper

Install the JDK

Zookeeper is developed in Java language, so use JDK first. For details, see JDK Installation on Multiple Platforms

Install Zookeeper in single-node mode

Node planning

Here, the master machine is used for the single-node demonstration, and the address is 10.211.255.3.

Installation directory: / home/parallels/Downloads/zookeeper.

To Zookeeper’s official website to get the version you want to install address: https://zookeeper.apache.org/releases.html.

Note: do not select the source package when selecting here.

#Downloading compressed packages
$Wget HTTP: / / https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

#Unpack the
$Tar -zxvf installation package

#I installed under the/home/parallels/Downloads

$ cd conf
$ cp zoo_sample.cfg zoo.cfg

#Start in the conf directory
$ ../bin/zkServer.sh start

#Check the startup status of the server
$ ../bin/zkServer.sh status

#Stop the service
$ ../bin/zkServer.sh stop
Copy the code

So the installation of the single-node ZK is basically done. The default configuration file used is conf/zoo.cfg.

Configuring environment Variables

Setting an environment variable is a command to make it easier to use

#Editing a Configuration File
$ vim /etc/environment

#Add before PATHZK_HOME = "/ home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin." "
#Modify the PTAH to add at the end of it 
:$ZK_HOME/bin
Copy the code

The file is as follows:

Use the zkClient to operate the zkServer

Ensure that the zkServer runs properly before performing operations

Basic operation

#Connection zkServer
$ zkCli.sh -server

#If no IP address or port is specified, 127.0.0.1:2181 is connected by default. The following is an example of the specified port and IP address
$ZkCli. Sh - server 127.0.0.1:2181

#Once connected, it behaves like a Linux file system
$ ls /

#Creating and assigning directories default to persistent nodes
$ create /aha aha

#View the value of the created directory
$ get /aha

#Create subdirectories under /aha and create values
$ create /aha/child child

#View the directory structure ls /
#Aha and Zookeeper are shown below but cannot enter/AHA inside it is maintaining a virtual directory structure is notcdThere's one on this side of the commandstatCommands are important
$ stat /aha

#Return the following informationCZxid = 0x200000002 # create /aha transaction ID ctime = Wed May 12 13:25:46 CST 2021 # create time mZxid = 0x200000002 # Mtime = Wed May 12 13:25:46 CST 2021 pZxid = 0x200000003 Adding or removing child nodes affects the child node list,#However, modifying the data content of the child node does not affect the ID. (Note that the pzxID is changed only when the list of child nodes is changed, and the change of the child node content does not affect the PzxID.)Datlversion = 0; aclVersion = 0; Add 1 ephemeralOwner = 0x0 # Create session sessionID of the temporary node. DataLength = 9 # numChildren = 1 # The number of children that this node has.
#Example Create temporary node create-e 
$ create -e /aha/temp temp-value

#Creating the sequence node create-s adds a sequence number to the end of the directory
$ create /aha/aha

Created /aha/aha0000000003

#This is true when creating temporary order nodes
$ create -e -s 

#To modify the value of a nodesetThe command
$ set path data [-v version]- version: this parameter is optional. The version number can be used as an optimistic lock. The value can be set only when the version field is correct
#The setting fails when the version is incorrect
$ set /LOCK/aha uuu -v 3

#The synchronization command is followed by the directory
sync /aha
Copy the code

You can use help directly on the client to see how to use the command

Temporary nodes (directories) cannot have children under them

Set up a ZK cluster

Node planning

Use master01 side, node01 and node02 three machines for cluster structures, demonstration, the address is respectively 10.211.255.3 10.211.55.4, 10.211.55.5.

Installation directory: / home/parallels/Downloads/zookeeper.

The first step is to install the JDK and download the installation package. The configuration of ZK environment variables is described in the single node section.

Here set the name of the host, so that it is not easy to distinguish which one is which when operating multiple hosts:

$ hostnamectl set-hostname master01
$ hostnamectl set-hostname node01
$ hostnamectl set-hostname node02
Copy the code

After setting up, restart the machine and reconnect, you can see:

FAQ: To prevent brain splitting, the number of nodes in a ZK cluster must be 2N + 1.

Modify thezoo.cfgThe configuration of the

#In conf, copy a zoo.cfg file
$ cp zoo_sample.cfg zoo.cfg
Copy the code

Zoo.cfg file:

DataDir =/ TMP /zookeeper: directory where zK data is saved. The default directory is TMP. This directory is used to store temporary data.

Description of zK three ports:

  1. 2181:clientEnd to provide service
  2. 2888: used for communication between machines in the cluster
  3. 3888Elections:leaderuse

TickTime: heartbeat interval (ms). If the number of tickTime exceeds 2, the session will be considered as timeout

InitLimit =10: indicates the maximum time (number of heartbeat intervals) for initializing the connection. If this value is exceeded, the connection times out

SyncLimit =5: Timeout of synchronizing messages between the leader and followers in the ZK cluster

ClientPort: specifies the port number monitored by the server for the client to connect to the server

Modify zoo. CFG file:

#1. Add the zoo. CFG file to the end of the zoo. CFG file on the three machinesServer. 1 = 10.211.55.3:2888-3888 for server 2 = 10.211.55.4:2888:3888 server. 3 = 10.211.55.5:2888-3888
#2. Modify the dataDir of zoo. CFGDataDir = / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/dataCopy the code

Develop reading: if the observer side need, you can add this line configuration for server 4 = 10.211.55.6:2888-3888: the observer

addmyid

Add the myID file to the dataDir configuration folder. The content of the file is the n of the zoo.cfg server.n. For example, enter 1 in 10.211.55.3.

Operate the other nodes in sequence.

Start the cluster

#1. Start the cluster on all three machines - note that all three machines need to be started manually
$ zkServer.sh start 

#2. View the cluster status and run the command on the node
$ zkServer.sh status

#The following information is displayed on the follower nodeRoot @ master01: / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/conf# zkServer. Sh status zookeeper JMX Enabled by default Using the config: / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/bin /.. /conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower
#The following information is displayed on the leader nodeRoot @ node01: / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/conf# zkServer. Sh status zookeeper JMX Enabled by default Using the config: / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/bin /.. /conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leaderCopy the code

Step pit instructions:

When you Telnet a failed port, there are two situations:

  1. Firewall problems: In this case, you can turn off the firewall or add ports to the firewall

  2. The machine you are accessing is not listening to the port at all: to troubleshoot this problem, run the following command on the machine you are accessing: netstat -lntup can be used in conjunction with grep

Test the cluster using the client

#Note that port 2181 is used for this connection
$ZkCli. Sh - server 10.211.55.4:2181
Copy the code

Zk graphical interface tool

https://github.com/zzhang5/zooinspector
Copy the code

SpringBootintegrationzk

usezkClientoperationzk

addmavenRely on

<! Configure the Zookeeper client dependency to be the same as the Zookeeper Server version. Configure the Zookeeper client dependency to be the same as the Zookeeper Server version
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.6.3</version>
  <exclusions>
    <! The zK package uses the log4j log, which conflicts with the SpringBoot logback log
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<! -- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
  <groupId>com.101tec</groupId>
  <artifactId>zkclient</artifactId>
  <version>0.11</version>
</dependency>
Copy the code

Write the configuration

zookeeper:
  # Use commas to separate the zooKeeper Server addresses if there are multiple addresses. Such as ip1: port1, ip2: port2, ip3: port3
  address: 10.21155.3.: 2181,10.211. 55.4:2181,10.211. 55.5:2181
  retryCount: 5               # retries
  initElapsedTimeMs: 1000     Initial retry interval
  maxElapsedTimeMs: 5000      # Maximum retry interval
  sessionTimeoutMs: 30000     # Session timeout
  connectionTimeoutMs: 10000  Connection timeout
Copy the code

Writing configuration classes

package com.aha.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/** * Connects the ZooKeeper configuration class **@author: WT
 * @dateProspering: 2021/11/22 * /
@Data
@Configuration
@ConfigurationProperties(prefix = "zookeeper")
public class ZkClientProperties {

    /** Number of retries */
    private int retryCount;

    /** Initial retry interval */
    private int initElapsedTimeMs;

    /** Maximum retry interval */
    private int maxElapsedTimeMs;

    /** Connection address */
    private String address;

    /**Session expiration time */
    private int sessionTimeoutMs;

    /** Connection timeout time */
    private int connectionTimeoutMs;

}
Copy the code

registeredzkClient

package com.aha.client;

import com.aha.config.ZkClientProperties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * Generate zK client **@author: WT
 * @date: 2021/11/22 18:18
 */
@Configuration
public class ZookeeperClient {

    @Bean
    private static ZkClient zkClient (ZkClientProperties zookeeperProperties) {
        // See ZkClient's constructor if you need to extend the parameters
        return new ZkClient(newZkConnection(zookeeperProperties.getAddress()), zookeeperProperties.getConnectionTimeoutMs()); }}Copy the code

demozkClientListening event of

Write the Service demo class

package com.aha.lock.service;


import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.stereotype.Service;

/** * Tests zkClient's listening event **@author: WT
 * @date: 2021/11/23 14:53 * /
@Slf4j
@Service
public class ZkWatcher {

    private final ZkClient zkClient;
    private static final String NODE_PATH = "/path";
    private static final String LOG_SEPARATOR = "+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +";

    public ZkWatcher (ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    public void testSubscribeChildChanges (a) throws InterruptedException {

        ParentPath: we specify the node to listen on * currentChildren: * When the specified node and its subnodes are added or deleted, they will be monitored. Delivering events #1 done */ Can be repeated as a result of repeated actions
        zkClient.subscribeChildChanges(NODE_PATH, (parentPath, currentChildren) -> {
            log.info("The current node is :{}, and the children under the current node are :{}", parentPath, currentChildren);
            log.info(LOG_SEPARATOR);
        });

        operateNode(zkClient);

    }

    public void testSubscribeDataChanges (a) throws InterruptedException {

        zkClient.subscribeDataChanges(NODE_PATH, new IZkDataListener() {

            /** * listen for node deletion *@paramPath Node path *@throwsThe Exception Exception * /
            @Override
            public void handleDataDeleted(String path) throws Exception {
                log.info("The deleted node is: {}", path);
                log.info(LOG_SEPARATOR);
            }

            /** * can listen to create update operation of the node *@paramPath Node path *@paramData Changed content *@throwsThe Exception Exception * /
            @Override
            public void handleDataChange(String path, Object data) throws Exception {
                log.info("The changed node is :{}, the changed content is :{}", path , data); log.info(LOG_SEPARATOR); }}); operateNode(zkClient); }public static void operateNode (ZkClient zkClient) throws InterruptedException {

        // Delete recursively
        zkClient.deleteRecursive(NODE_PATH);

        // An exception will be thrown if the node already exists: ZkNodeExistsException
        zkClient.createPersistent(NODE_PATH);
        Thread.sleep(1000);

        // SubscribeChildChanges does not listen to update operations on the parent node; SubscribeDataChanges listens for update operations on nodes
        zkClient.writeData(NODE_PATH,"The parent node has changed");
        Thread.sleep(1000);

        zkClient.createPersistent(NODE_PATH + "/" + "c1"."C1 content");
        Thread.sleep(1000);

        zkClient.createPersistent(NODE_PATH + "/" + "c2"."C2 content");
        Thread.sleep(1000);

        // Does not listen for update operations on child nodes
        zkClient.writeData(NODE_PATH + "/" + "c1"."C1 New Content");
        Thread.sleep(1000);

        zkClient.delete(NODE_PATH + "/c2");
        Thread.sleep(1000);

        // A ZkException will be thrown if a node deleted using this method is not empty
        // zkClient.delete(NODE_PATH);

        // Recursive deletion: this happens twice, first deleting /super/c1 once, then deleting /super again
        zkClient.deleteRecursive(NODE_PATH);

        // This delay is necessary because if your main method stops directly, the listener is also closed. The listener is asynchronous, and subsequent operations may stop before the thread is heard
        Thread.sleep(10000);

        log.info("End of delay"); }}Copy the code

Writing test Controllers

package com.aha.lock.controller;

import com.aha.lock.service.InterprocessMutexLock;
import com.aha.lock.service.ZkWatcher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/ * * *@author: WT
 * @date: 2021/11/23 14:13 * /
@RestController
public class TestController {

    private final InterprocessMutexLock interprocessMutexLock;
    private final ZkWatcher zkWatcher;


    public TestController ( InterprocessMutexLock interprocessMutexLock, ZkWatcher zkWatcher ) {
        this.interprocessMutexLock = interprocessMutexLock;
        this.zkWatcher = zkWatcher;
    }

    /** * Tests the SubscribeDataChanges method of zkClient */
    @GetMapping("/zk-client/subscribe/data")
    public void testZkClientSubscribeDataChanges (a) throws InterruptedException {
        zkWatcher.testSubscribeDataChanges();
    }

    /** * Tests the SubscribeChildChanges method of zkClient */
    @GetMapping("/zk-client/subscribe/child")
    public void testZkClientSubscribeChildChanges (a) throws InterruptedException { zkWatcher.testSubscribeChildChanges(); }}Copy the code

useCuratoroperationzk

addmavenRely on

<! Configure the Zookeeper client dependency to be the same as the Zookeeper Server version. Configure the Zookeeper client dependency to be the same as the Zookeeper Server version
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.6.3</version>
  <exclusions>
    <! The zK package uses the log4j log, which conflicts with the SpringBoot logback log
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<! -- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes contains the implementation of the distributed lock >
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>5.1.0</version>
</dependency>
Copy the code

Write the configuration

zookeeper:
  # Use commas to separate the zooKeeper Server addresses if there are multiple addresses. Such as ip1: port1, ip2: port2, ip3: port3
  address: 10.21155.3.: 2181,10.211. 55.4:2181,10.211. 55.5:2181
  retryCount: 5               # retries
  initElapsedTimeMs: 1000     Initial retry interval
  maxElapsedTimeMs: 5000      # Maximum retry interval
  sessionTimeoutMs: 30000     # Session timeout
  connectionTimeoutMs: 10000  Connection timeout
Copy the code

Writing configuration classes

package com.aha.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/** * Connects the ZooKeeper configuration class **@author: WT
 * @dateProspering: 2021/11/22 * /
@Data
@Configuration
@ConfigurationProperties(prefix = "zookeeper")
public class ZkClientProperties {

    /** Number of retries */
    private int retryCount;

    /** Initial retry interval */
    private int initElapsedTimeMs;

    /** Maximum retry interval */
    private int maxElapsedTimeMs;

    /** Connection address */
    private String address;

    /**Session expiration time */
    private int sessionTimeoutMs;

    /** Connection timeout time */
    private int connectionTimeoutMs;

}
Copy the code

registeredCuratorFramework

package com.aha.client;

import com.aha.config.ZkClientProperties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * Generate zK client **@author: WT
 * @date: 2021/11/22 18:18
 */
@Configuration
public class ZookeeperClient {

    /** * initMethod = "start" * after the curatorFramework creates the object, call the start method of the curatorFramework instance */
// @Bean(initMethod = "start")
// public CuratorFramework curatorFramework(ZkClientProperties zookeeperProperties) {
// return CuratorFrameworkFactory.newClient(
// zookeeperProperties.getAddress(),
// zookeeperProperties.getSessionTimeoutMs(),
// zookeeperProperties.getConnectionTimeoutMs(),
// new RetryNTimes(zookeeperProperties.getRetryCount(), zookeeperProperties.getInitElapsedTimeMs())
/ /);
/ /}

    @Bean(initMethod = "start")
    private static CuratorFramework getZkClient(ZkClientProperties zookeeperProperties) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, zookeeperProperties.getRetryCount(), 5000);
        returnCuratorFrameworkFactory.builder() .connectString(zookeeperProperties.getAddress()) .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs()) .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs()) .retryPolicy(retryPolicy) .build(); }}Copy the code

democuratorDistributed reentrant exclusive lock provided by

Write the Service demo class

package com.aha.lock.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

/** * Test distributed reentrant exclusive lock provided by Exhibit co-curator **@author: WT
 * @date: 2021/11/22 17:41
 */
@Slf4j
@Service
public class InterprocessMutexLock {

    private final CuratorFramework curatorFramework;
    private int inventory = 40;

    public InterprocessMutexLock (CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    public void test(String lockPath)  {

        InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
        // Simulate 50 thread snatch lock
        for (int i = 0; i < 50; i++) {
            new Thread(newTestThread(i, lock)).start(); }}class TestThread implements Runnable {

        private final Integer threadFlag;
        private final InterProcessMutex lock;

        public TestThread(Integer threadFlag, InterProcessMutex lock) {
            this.threadFlag = threadFlag;
            this.lock = lock;
        }

        @Override
        public void run(a) {

            try {

                lock.acquire();
                log.info("{} thread acquired lock, inventory remaining: {}", threadFlag, inventory);
                if (inventory > 0) {
                    inventory --;
                    log.info("Inventory remaining: {}", inventory);
                } else {
                    log.info(Inventory: {}, thread {} not available, inventory, threadFlag); }}catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}
Copy the code

Writing test Controllers

package com.aha.lock.controller;

import com.aha.lock.service.InterprocessMutexLock;
import com.aha.lock.service.ZkWatcher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/ * * *@author: WT
 * @date: 2021/11/23 14:13 * /
@RestController
public class TestController {

    private final InterprocessMutexLock interprocessMutexLock;
    private final ZkWatcher zkWatcher;


    public TestController ( InterprocessMutexLock interprocessMutexLock, ZkWatcher zkWatcher ) {
        this.interprocessMutexLock = interprocessMutexLock;
        this.zkWatcher = zkWatcher;
    }

    /** * Test distributed reentrant exclusive it lock provided by Exhibit co-curator */
    @GetMapping("/lock/mutex")
    public void testMutexLock (a) {
        interprocessMutexLock.test("/lock/mutex"); }}Copy the code

For a code example, see the distributed Lock-ZooKeeper project under lock at https://github.com/WT-AHA/JAVA-TREASURE.git

FAQ

The shortage of zk

  1. zkWith limited performance,tpsIt’s probably just over 10,000, because onlyleaderThe node can process transaction requests.
  2. zkThere is no external service when selecting the master node, and the process is relatively slow.
  3. zkThe permission control is relatively simple.
  4. zkYou might be reading old data when you read. becausezkWhen more than half of the machine data synchronization is completed, the data synchronization is considered completed, and the node you read may be the node that is not synchronized.

Is zk cp or AP

It should be inconsistent to analyze whether ZK is CP or AP from different angles:

  1. zkleaderExternal services will be suspended during the election period (zkRely onleaderTo ensure data consistency), so the loss of availability ensures consistency, i.ecp. However, data consistency here refers to final data consistency rather than strong data consistency. When data synchronization is complete on more than half of the nodes, the cluster considers data synchronization complete.
  2. A read request comes in, to ensure availability, do not block to allfollowerWhen synchronization is complete, data services can be providedap.

Therefore, ZK balances the CAP theory by using the sequential consistency of final consistency and the half mechanism, but cp should be selected better if it is a multiple-choice question.

In the analysis of CAP, P must be satisfied in general, or contrary to the principle of distribution, so it is generally a choice between C and A.

Common components cap categories:

  1. eurekaBelong toap
  2. etcdBelong tocp
  3. consulBelong tocp