This section describes important concepts of ZooKeeper
ZooKeeper is a software project of the Apache Software Foundation. It provides open source distributed configuration services, synchronization services, and name registration for large-scale distributed computing. ZooKeeper is a typical distributed data consistency solution.
Objective: ZooKeeper is dedicated to providing distributed coordination services with high performance, high availability, and strict sequential access control capabilities for distributed applications.
- High performance: Stores all data in memory and directly serves all non-transaction requests of the client. This mode is especially applicable to read application scenarios, but is not suitable for transaction request operations.
- High availability: Generally, services are provided in a cluster. As long as more than a few machines in the cluster can work properly, the entire cluster can work properly.
- Strict access order: For each transaction request from the client,
zk
They’re all assigned oneGlobal transaction ID: zxID
, which reflects the sequence of all transaction operations.
Zk Cluster role
Leader
: The core of the cluster work, the unique scheduler and handler of transaction requests, ensuring the sequence of transaction processing. All write requests must be forwarded toLeader
To deal with.Leader
You need to determine the number to perform the operation.Follower
: Processes client non-transaction requests and forwards transaction requests toLeader
, to participate inLeader
The election.Observer
The observer: handles the independent processing of non-transaction requests, and for transaction requests, also forward toLeader
The server processes and does not participateleader
Do not participate in the election2PC
Data synchronization process.
2PC Data synchronization Process For details, see message Broadcast in the Data synchronization Process section of the ZK
Zk node features
Same-level nodekey
Name the only
If an existing node is created again, a message is displayed indicating that the node already exists.
When creating a node, you must specify the full path of the node
$ ls /runoob
$ create /runoob/child 0
$ create /runoob/child/ch01 0
Copy the code
Temporary nodes insession
It’s cleared when it’s closed
Automatically number sequential nodes
#Creating the sequence node create-s adds a sequence number to the end of the directory
$ create /aha
Created /aha0000000003
Copy the code
Provides the nodewatch
mechanism
Refer to the following section: demonstrate zkClient’s listening events
delete
Commands can only be deleted layer by layer
An error is reported when the delete command is used to delete a node with child nodes.
This can be done using the deleteall command
Zk permission control
Zk Access Control lists (ACLs) are very important in production environments.
The composition of the ACL
Zk acLs define permission lists with [Scheme: ID: Permissions]
-
Scheme: indicates a permission mechanism, including world, Auth, Digest, and IP.
world
: The default mode, which means that the whole world can accessauth
: indicates an authenticated userdigest
: The user name and password are used for authenticationip
Use:ip
Address mode for authentication can be specifiedip
Or address segment
-
Id: indicates the user that is allowed to access.
-
Permissions: Permission combination Character string, consisting of cDRWA, where each letter stands for different permissions, such as create permission CREATE (c), delete permission delete(d), read permission read(r), write permission write(w), admin(a), The administrator has permission to perform ACL operations. D is the permission to perform operations on child nodes, and other operations are the permission to perform operations on its own nodes.
usedigest
Mode for permission control
Write a utility class to generate the username and password needed to create a digest mode user
Importing Maven dependencies
<! Configure the Zookeeper client dependency to be the same as the Zookeeper Server version. Configure the Zookeeper client dependency to be the same as the Zookeeper Server version
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
<exclusions>
<! The zK package uses the log4j log, which conflicts with the SpringBoot logback log
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
Copy the code
Write utility classes
package com.aha.utils;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import java.security.NoSuchAlgorithmException;
/** * ZK tool class **@author: WT
* @date: 2021/12/1 blessed * /
@Slf4j
public class ZkUtils {
/** * Generates an encrypted string * for the digest username and password@paramIdPassword Indicates the plaintext * of the user name and password@return
* @throws NoSuchAlgorithmException
*/
public static String getDigestAuthStr (String idPassword) throws NoSuchAlgorithmException {
return DigestAuthenticationProvider.generateDigest(idPassword);
}
public static void main(String[] args) throws NoSuchAlgorithmException {
// user:svY1VPBeOtL797NihAvQMUtFLHs=
log.info(ZkUtils.getDigestAuthStr("user:111111")); }}Copy the code
Set the corresponding node based on the generated encryption permission string
#Create an AHA node and set user:111111 to have read and write permissions and acl permissions
$ create /aha aha-data digest:user:svY1VPBeOtL797NihAvQMUtFLHs=:rwda
#View the/AHA node when no user login is performed after the command is executed
$ ls /aha
#Insufficient return permission
[zk: localhost:2181(CONNECTED) 2] get /aha
Insufficient permission : /aha
#Log in the user and then view the corresponding node
$ addauth digest user:111111
$ get /aha
#You can view it normally
[zk: localhost:2181(CONNECTED) 7] get /aha
aha-data
Copy the code
Guide to pit mining:
#Create an AHA node and set user:111111 to have read and write permissions and acl permissions
$ create /aha aha-data digest:user:svY1VPBeOtL797NihAvQMUtFLHs=:rwda
Copy the code
When executing this command, be sure to set the value for the node, because the create command looks like this: Create [-s] [-e] [-c] [-t TTL] path [data] [acl] So if you want to add acLs without adding data, the STRING of ACLs will be set to the node as data, and acLs will not be set.
useauth
Mode Indicates permission setting
The differences between auth mode and Digest mode are as follows:
- In the use of
auth
Mode Before setting the permission of a node, create a useraddauth digest user:pwd
- When setting permissions for nodes, you only need to set the user name, as shown in the following example
#Add user
$ addauth digest aha:111
#Add a node and set permissions
$ create /auth auth-data auth:aha:cdwra
#If you want to operate /auth after connecting to other clients or reconnecting to this client, you need to log in as the user first, otherwise you will report insufficient permissions
$ addauth digest aha:111
#And then you're ready to do it
Copy the code
useip
Mode Indicates permission setting
#Specify that 10.211.55.4 has cDRWA permission
$The create/IP IP - date IP: 10.211.55.4: cdrwa
#None 10.211.55.4 Performing operations on node/IP
$ get /ip
#Indicating insufficient permissions
[zk: localhost:2181(CONNECTED) 0] ls /ip
Insufficient permission : /ip
#Use 10.211.55.4 to perform operations on node/IP
$ get /ip
#Can return normally
#Using multiple IP addresses
$SetAcl/IP IP: 10.211.55.4: cdwra, IP: 10.211.55.5: cdwra
#You can also use the IP address segment
$SetAcl/IP IP: 10.211.55.4/24: cdwra
Copy the code
Zk read/write mechanism
Zk uses a single master process, the Leader, to receive and process all transaction requests from the client. Note that this is a transaction request. Non-transaction requests can be handled directly by followers or observers.
Transaction requests are requests that involve data modification: add, modify, delete
Non-transactional requests are: query operations
When a non-leader node receives a transaction, it forwards the request to the Leader node for processing.
Zk data synchronization process
Zk mainly relies on ZAB (Zookeeper Atomic Broadcas) Atomic message broadcast protocol to achieve the consistency of distributed data.
ZAB protocol, similar to Paxos, is also a data consistency algorithm, which includes two parts: message broadcast and crash recovery.
News broadcast
After receiving the transaction request forwarded by the client, observer, or follower node, the leader node translates the request into a Proposal: (Proposed) Broadcast to all the follower nodes, except observer nodes. If half of the followers in the cluster give correct ACK feedback to the leader node, At this point, the leader node sends a COMMIT message to all the followers to submit the proposal. This process can be called a 2PC transaction commit. As shown below:
Note: The Observer node is only responsible for synchronizing the leader data and does not participate in the 2PC process.
ACK: Acknowledge character Acknowledge character
Crash recovery
It can run well under normal condition of message broadcast, but once the Leader server crashes or loses communication with half of followers due to network principles, it will enter the crash recovery mode and need to elect a new Leader server. During this process, two data inconsistency risks may occur, which need to be avoided by the ZAB protocol.
The recovery mode of THE ZAB protocol uses the following policies:
- The election
zxid
The largest node as the newleader
- new
leader
Uncommitted messages in the transaction log are processed
Zk leader election
Introduction to Important Parameters
- The server
ID(myid)
: The larger the number, the greater the weight in the election algorithm - Global transaction
ID(zxid)
: The greater the value, the newer the data and the greater the weight - The logical clock
(epoch-logicalclock)
: The logical clock value is the same during the voting round. The value increases after each voting to verify whether the voting round is the same
Election Status
LOOKING
: Campaign statusFOLLOWING
: Follower status, synchronizationleader
Status, participate in the voteOBSERVING
: Observe the status and synchronize the dataleader
Status, do not participate in the voteLEADING
: Leader status
Introduction to election Process
The algorithm used for the election is the PaxOS algorithm.
Each node is started in a look-and-see state, followed by the election process. This section takes a cluster of three machines as an example. When the first server, Server1, is started, the leader election cannot take place. When the second server, Server2, is started, the two servers can communicate with each other and enter the leader election process.
-
Send a vote: Each server issues a vote. Since it is the initial situation, server1 and server2 will vote themselves as the leader server. Each vote contains the selected server myID, ZXID and epoch, and servern(myID, Zxid), server1 votes for (1,0), server2 votes for (2,0), and sends their votes to other machines in the cluster.
-
Check the vote: after each server in the cluster receives the vote, it first checks the validity of the vote, such as whether it is the epoch of this round and whether it is from a server in the LOOKING state.
-
Vote comparison: For each vote, the server needs to compare the vote of other servers with its own vote. The comparison rules are as follows:
- Priority to compare
epoch
- check
zxid
.zxid
Larger servers are preferredleader
- if
zxid
Same, then comparemyid
.myid
Larger servers asleader
The server
- Priority to compare
-
Statistical voting: after each vote, the server counts the voting information and determines that more than half of the machines have received the same voting information. Both server1 and server2 count that two machines in the cluster accept the voting information of (2,0), and server2 is selected as the leader node.
-
Change the server state. Once the leader is determined, each server responds by updating its state, changing it to FOLLOWING if it is follower, or to LEADING if it is leader. Server3 continues to start and changes itself to FOLLOWING.
The installation of a Zookeeper
Install the JDK
Zookeeper is developed in Java language, so use JDK first. For details, see JDK Installation on Multiple Platforms
Install Zookeeper in single-node mode
Node planning
Here, the master machine is used for the single-node demonstration, and the address is 10.211.255.3.
Installation directory: / home/parallels/Downloads/zookeeper.
To Zookeeper’s official website to get the version you want to install address: https://zookeeper.apache.org/releases.html.
Note: do not select the source package when selecting here.
#Downloading compressed packages
$Wget HTTP: / / https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
#Unpack the
$Tar -zxvf installation package
#I installed under the/home/parallels/Downloads
$ cd conf
$ cp zoo_sample.cfg zoo.cfg
#Start in the conf directory
$ ../bin/zkServer.sh start
#Check the startup status of the server
$ ../bin/zkServer.sh status
#Stop the service
$ ../bin/zkServer.sh stop
Copy the code
So the installation of the single-node ZK is basically done. The default configuration file used is conf/zoo.cfg.
Configuring environment Variables
Setting an environment variable is a command to make it easier to use
#Editing a Configuration File
$ vim /etc/environment
#Add before PATHZK_HOME = "/ home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin." "
#Modify the PTAH to add at the end of it
:$ZK_HOME/bin
Copy the code
The file is as follows:
Use the zkClient to operate the zkServer
Ensure that the zkServer runs properly before performing operations
Basic operation
#Connection zkServer
$ zkCli.sh -server
#If no IP address or port is specified, 127.0.0.1:2181 is connected by default. The following is an example of the specified port and IP address
$ZkCli. Sh - server 127.0.0.1:2181
#Once connected, it behaves like a Linux file system
$ ls /
#Creating and assigning directories default to persistent nodes
$ create /aha aha
#View the value of the created directory
$ get /aha
#Create subdirectories under /aha and create values
$ create /aha/child child
#View the directory structure ls /
#Aha and Zookeeper are shown below but cannot enter/AHA inside it is maintaining a virtual directory structure is notcdThere's one on this side of the commandstatCommands are important
$ stat /aha
#Return the following informationCZxid = 0x200000002 # create /aha transaction ID ctime = Wed May 12 13:25:46 CST 2021 # create time mZxid = 0x200000002 # Mtime = Wed May 12 13:25:46 CST 2021 pZxid = 0x200000003 Adding or removing child nodes affects the child node list,#However, modifying the data content of the child node does not affect the ID. (Note that the pzxID is changed only when the list of child nodes is changed, and the change of the child node content does not affect the PzxID.)Datlversion = 0; aclVersion = 0; Add 1 ephemeralOwner = 0x0 # Create session sessionID of the temporary node. DataLength = 9 # numChildren = 1 # The number of children that this node has.
#Example Create temporary node create-e
$ create -e /aha/temp temp-value
#Creating the sequence node create-s adds a sequence number to the end of the directory
$ create /aha/aha
Created /aha/aha0000000003
#This is true when creating temporary order nodes
$ create -e -s
#To modify the value of a nodesetThe command
$ set path data [-v version]- version: this parameter is optional. The version number can be used as an optimistic lock. The value can be set only when the version field is correct
#The setting fails when the version is incorrect
$ set /LOCK/aha uuu -v 3
#The synchronization command is followed by the directory
sync /aha
Copy the code
You can use help directly on the client to see how to use the command
Temporary nodes (directories) cannot have children under them
Set up a ZK cluster
Node planning
Use master01 side, node01 and node02 three machines for cluster structures, demonstration, the address is respectively 10.211.255.3 10.211.55.4, 10.211.55.5.
Installation directory: / home/parallels/Downloads/zookeeper.
The first step is to install the JDK and download the installation package. The configuration of ZK environment variables is described in the single node section.
Here set the name of the host, so that it is not easy to distinguish which one is which when operating multiple hosts:
$ hostnamectl set-hostname master01
$ hostnamectl set-hostname node01
$ hostnamectl set-hostname node02
Copy the code
After setting up, restart the machine and reconnect, you can see:
FAQ: To prevent brain splitting, the number of nodes in a ZK cluster must be 2N + 1.
Modify thezoo.cfg
The configuration of the
#In conf, copy a zoo.cfg file
$ cp zoo_sample.cfg zoo.cfg
Copy the code
Zoo.cfg file:
DataDir =/ TMP /zookeeper: directory where zK data is saved. The default directory is TMP. This directory is used to store temporary data.
Description of zK three ports:
2181
:client
End to provide service2888
: used for communication between machines in the cluster3888
Elections:leader
use
TickTime: heartbeat interval (ms). If the number of tickTime exceeds 2, the session will be considered as timeout
InitLimit =10: indicates the maximum time (number of heartbeat intervals) for initializing the connection. If this value is exceeded, the connection times out
SyncLimit =5: Timeout of synchronizing messages between the leader and followers in the ZK cluster
ClientPort: specifies the port number monitored by the server for the client to connect to the server
Modify zoo. CFG file:
#1. Add the zoo. CFG file to the end of the zoo. CFG file on the three machinesServer. 1 = 10.211.55.3:2888-3888 for server 2 = 10.211.55.4:2888:3888 server. 3 = 10.211.55.5:2888-3888
#2. Modify the dataDir of zoo. CFGDataDir = / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/dataCopy the code
Develop reading: if the observer side need, you can add this line configuration for server 4 = 10.211.55.6:2888-3888: the observer
addmyid
Add the myID file to the dataDir configuration folder. The content of the file is the n of the zoo.cfg server.n. For example, enter 1 in 10.211.55.3.
Operate the other nodes in sequence.
Start the cluster
#1. Start the cluster on all three machines - note that all three machines need to be started manually
$ zkServer.sh start
#2. View the cluster status and run the command on the node
$ zkServer.sh status
#The following information is displayed on the follower nodeRoot @ master01: / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/conf# zkServer. Sh status zookeeper JMX Enabled by default Using the config: / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/bin /.. /conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower
#The following information is displayed on the leader nodeRoot @ node01: / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/conf# zkServer. Sh status zookeeper JMX Enabled by default Using the config: / home/parallels/Downloads/zookeeper/apache - they are - 3.6.3 - bin/bin /.. /conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leaderCopy the code
Step pit instructions:
When you Telnet a failed port, there are two situations:
Firewall problems: In this case, you can turn off the firewall or add ports to the firewall
The machine you are accessing is not listening to the port at all: to troubleshoot this problem, run the following command on the machine you are accessing: netstat -lntup can be used in conjunction with grep
Test the cluster using the client
#Note that port 2181 is used for this connection
$ZkCli. Sh - server 10.211.55.4:2181
Copy the code
Zk graphical interface tool
https://github.com/zzhang5/zooinspector
Copy the code
SpringBoot
integrationzk
usezkClient
operationzk
addmaven
Rely on
<! Configure the Zookeeper client dependency to be the same as the Zookeeper Server version. Configure the Zookeeper client dependency to be the same as the Zookeeper Server version
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
<exclusions>
<! The zK package uses the log4j log, which conflicts with the SpringBoot logback log
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<! -- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
Copy the code
Write the configuration
zookeeper:
# Use commas to separate the zooKeeper Server addresses if there are multiple addresses. Such as ip1: port1, ip2: port2, ip3: port3
address: 10.21155.3.: 2181,10.211. 55.4:2181,10.211. 55.5:2181
retryCount: 5 # retries
initElapsedTimeMs: 1000 Initial retry interval
maxElapsedTimeMs: 5000 # Maximum retry interval
sessionTimeoutMs: 30000 # Session timeout
connectionTimeoutMs: 10000 Connection timeout
Copy the code
Writing configuration classes
package com.aha.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/** * Connects the ZooKeeper configuration class **@author: WT
* @dateProspering: 2021/11/22 * /
@Data
@Configuration
@ConfigurationProperties(prefix = "zookeeper")
public class ZkClientProperties {
/** Number of retries */
private int retryCount;
/** Initial retry interval */
private int initElapsedTimeMs;
/** Maximum retry interval */
private int maxElapsedTimeMs;
/** Connection address */
private String address;
/**Session expiration time */
private int sessionTimeoutMs;
/** Connection timeout time */
private int connectionTimeoutMs;
}
Copy the code
registeredzkClient
package com.aha.client;
import com.aha.config.ZkClientProperties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * Generate zK client **@author: WT
* @date: 2021/11/22 18:18
*/
@Configuration
public class ZookeeperClient {
@Bean
private static ZkClient zkClient (ZkClientProperties zookeeperProperties) {
// See ZkClient's constructor if you need to extend the parameters
return new ZkClient(newZkConnection(zookeeperProperties.getAddress()), zookeeperProperties.getConnectionTimeoutMs()); }}Copy the code
demozkClient
Listening event of
Write the Service demo class
package com.aha.lock.service;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.stereotype.Service;
/** * Tests zkClient's listening event **@author: WT
* @date: 2021/11/23 14:53 * /
@Slf4j
@Service
public class ZkWatcher {
private final ZkClient zkClient;
private static final String NODE_PATH = "/path";
private static final String LOG_SEPARATOR = "+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +";
public ZkWatcher (ZkClient zkClient) {
this.zkClient = zkClient;
}
public void testSubscribeChildChanges (a) throws InterruptedException {
ParentPath: we specify the node to listen on * currentChildren: * When the specified node and its subnodes are added or deleted, they will be monitored. Delivering events #1 done */ Can be repeated as a result of repeated actions
zkClient.subscribeChildChanges(NODE_PATH, (parentPath, currentChildren) -> {
log.info("The current node is :{}, and the children under the current node are :{}", parentPath, currentChildren);
log.info(LOG_SEPARATOR);
});
operateNode(zkClient);
}
public void testSubscribeDataChanges (a) throws InterruptedException {
zkClient.subscribeDataChanges(NODE_PATH, new IZkDataListener() {
/** * listen for node deletion *@paramPath Node path *@throwsThe Exception Exception * /
@Override
public void handleDataDeleted(String path) throws Exception {
log.info("The deleted node is: {}", path);
log.info(LOG_SEPARATOR);
}
/** * can listen to create update operation of the node *@paramPath Node path *@paramData Changed content *@throwsThe Exception Exception * /
@Override
public void handleDataChange(String path, Object data) throws Exception {
log.info("The changed node is :{}, the changed content is :{}", path , data); log.info(LOG_SEPARATOR); }}); operateNode(zkClient); }public static void operateNode (ZkClient zkClient) throws InterruptedException {
// Delete recursively
zkClient.deleteRecursive(NODE_PATH);
// An exception will be thrown if the node already exists: ZkNodeExistsException
zkClient.createPersistent(NODE_PATH);
Thread.sleep(1000);
// SubscribeChildChanges does not listen to update operations on the parent node; SubscribeDataChanges listens for update operations on nodes
zkClient.writeData(NODE_PATH,"The parent node has changed");
Thread.sleep(1000);
zkClient.createPersistent(NODE_PATH + "/" + "c1"."C1 content");
Thread.sleep(1000);
zkClient.createPersistent(NODE_PATH + "/" + "c2"."C2 content");
Thread.sleep(1000);
// Does not listen for update operations on child nodes
zkClient.writeData(NODE_PATH + "/" + "c1"."C1 New Content");
Thread.sleep(1000);
zkClient.delete(NODE_PATH + "/c2");
Thread.sleep(1000);
// A ZkException will be thrown if a node deleted using this method is not empty
// zkClient.delete(NODE_PATH);
// Recursive deletion: this happens twice, first deleting /super/c1 once, then deleting /super again
zkClient.deleteRecursive(NODE_PATH);
// This delay is necessary because if your main method stops directly, the listener is also closed. The listener is asynchronous, and subsequent operations may stop before the thread is heard
Thread.sleep(10000);
log.info("End of delay"); }}Copy the code
Writing test Controllers
package com.aha.lock.controller;
import com.aha.lock.service.InterprocessMutexLock;
import com.aha.lock.service.ZkWatcher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/ * * *@author: WT
* @date: 2021/11/23 14:13 * /
@RestController
public class TestController {
private final InterprocessMutexLock interprocessMutexLock;
private final ZkWatcher zkWatcher;
public TestController ( InterprocessMutexLock interprocessMutexLock, ZkWatcher zkWatcher ) {
this.interprocessMutexLock = interprocessMutexLock;
this.zkWatcher = zkWatcher;
}
/** * Tests the SubscribeDataChanges method of zkClient */
@GetMapping("/zk-client/subscribe/data")
public void testZkClientSubscribeDataChanges (a) throws InterruptedException {
zkWatcher.testSubscribeDataChanges();
}
/** * Tests the SubscribeChildChanges method of zkClient */
@GetMapping("/zk-client/subscribe/child")
public void testZkClientSubscribeChildChanges (a) throws InterruptedException { zkWatcher.testSubscribeChildChanges(); }}Copy the code
useCurator
operationzk
addmaven
Rely on
<! Configure the Zookeeper client dependency to be the same as the Zookeeper Server version. Configure the Zookeeper client dependency to be the same as the Zookeeper Server version
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
<exclusions>
<! The zK package uses the log4j log, which conflicts with the SpringBoot logback log
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<! -- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes contains the implementation of the distributed lock >
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
Copy the code
Write the configuration
zookeeper:
# Use commas to separate the zooKeeper Server addresses if there are multiple addresses. Such as ip1: port1, ip2: port2, ip3: port3
address: 10.21155.3.: 2181,10.211. 55.4:2181,10.211. 55.5:2181
retryCount: 5 # retries
initElapsedTimeMs: 1000 Initial retry interval
maxElapsedTimeMs: 5000 # Maximum retry interval
sessionTimeoutMs: 30000 # Session timeout
connectionTimeoutMs: 10000 Connection timeout
Copy the code
Writing configuration classes
package com.aha.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/** * Connects the ZooKeeper configuration class **@author: WT
* @dateProspering: 2021/11/22 * /
@Data
@Configuration
@ConfigurationProperties(prefix = "zookeeper")
public class ZkClientProperties {
/** Number of retries */
private int retryCount;
/** Initial retry interval */
private int initElapsedTimeMs;
/** Maximum retry interval */
private int maxElapsedTimeMs;
/** Connection address */
private String address;
/**Session expiration time */
private int sessionTimeoutMs;
/** Connection timeout time */
private int connectionTimeoutMs;
}
Copy the code
registeredCuratorFramework
package com.aha.client;
import com.aha.config.ZkClientProperties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * Generate zK client **@author: WT
* @date: 2021/11/22 18:18
*/
@Configuration
public class ZookeeperClient {
/** * initMethod = "start" * after the curatorFramework creates the object, call the start method of the curatorFramework instance */
// @Bean(initMethod = "start")
// public CuratorFramework curatorFramework(ZkClientProperties zookeeperProperties) {
// return CuratorFrameworkFactory.newClient(
// zookeeperProperties.getAddress(),
// zookeeperProperties.getSessionTimeoutMs(),
// zookeeperProperties.getConnectionTimeoutMs(),
// new RetryNTimes(zookeeperProperties.getRetryCount(), zookeeperProperties.getInitElapsedTimeMs())
/ /);
/ /}
@Bean(initMethod = "start")
private static CuratorFramework getZkClient(ZkClientProperties zookeeperProperties) {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, zookeeperProperties.getRetryCount(), 5000);
returnCuratorFrameworkFactory.builder() .connectString(zookeeperProperties.getAddress()) .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs()) .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs()) .retryPolicy(retryPolicy) .build(); }}Copy the code
democurator
Distributed reentrant exclusive lock provided by
Write the Service demo class
package com.aha.lock.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;
/** * Test distributed reentrant exclusive lock provided by Exhibit co-curator **@author: WT
* @date: 2021/11/22 17:41
*/
@Slf4j
@Service
public class InterprocessMutexLock {
private final CuratorFramework curatorFramework;
private int inventory = 40;
public InterprocessMutexLock (CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
}
public void test(String lockPath) {
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
// Simulate 50 thread snatch lock
for (int i = 0; i < 50; i++) {
new Thread(newTestThread(i, lock)).start(); }}class TestThread implements Runnable {
private final Integer threadFlag;
private final InterProcessMutex lock;
public TestThread(Integer threadFlag, InterProcessMutex lock) {
this.threadFlag = threadFlag;
this.lock = lock;
}
@Override
public void run(a) {
try {
lock.acquire();
log.info("{} thread acquired lock, inventory remaining: {}", threadFlag, inventory);
if (inventory > 0) {
inventory --;
log.info("Inventory remaining: {}", inventory);
} else {
log.info(Inventory: {}, thread {} not available, inventory, threadFlag); }}catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Copy the code
Writing test Controllers
package com.aha.lock.controller;
import com.aha.lock.service.InterprocessMutexLock;
import com.aha.lock.service.ZkWatcher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/ * * *@author: WT
* @date: 2021/11/23 14:13 * /
@RestController
public class TestController {
private final InterprocessMutexLock interprocessMutexLock;
private final ZkWatcher zkWatcher;
public TestController ( InterprocessMutexLock interprocessMutexLock, ZkWatcher zkWatcher ) {
this.interprocessMutexLock = interprocessMutexLock;
this.zkWatcher = zkWatcher;
}
/** * Test distributed reentrant exclusive it lock provided by Exhibit co-curator */
@GetMapping("/lock/mutex")
public void testMutexLock (a) {
interprocessMutexLock.test("/lock/mutex"); }}Copy the code
For a code example, see the distributed Lock-ZooKeeper project under lock at https://github.com/WT-AHA/JAVA-TREASURE.git
FAQ
The shortage of zk
zk
With limited performance,tps
It’s probably just over 10,000, because onlyleader
The node can process transaction requests.zk
There is no external service when selecting the master node, and the process is relatively slow.zk
The permission control is relatively simple.zk
You might be reading old data when you read. becausezk
When more than half of the machine data synchronization is completed, the data synchronization is considered completed, and the node you read may be the node that is not synchronized.
Is zk cp or AP
It should be inconsistent to analyze whether ZK is CP or AP from different angles:
zk
在leader
External services will be suspended during the election period (zk
Rely onleader
To ensure data consistency), so the loss of availability ensures consistency, i.ecp
. However, data consistency here refers to final data consistency rather than strong data consistency. When data synchronization is complete on more than half of the nodes, the cluster considers data synchronization complete.- A read request comes in, to ensure availability, do not block to all
follower
When synchronization is complete, data services can be providedap
.
Therefore, ZK balances the CAP theory by using the sequential consistency of final consistency and the half mechanism, but cp should be selected better if it is a multiple-choice question.
In the analysis of CAP, P must be satisfied in general, or contrary to the principle of distribution, so it is generally a choice between C and A.
Common components cap categories:
eureka
Belong toap
etcd
Belong tocp
consul
Belong tocp