1. Introduction
- The Curator framework is wrapped on top of the Native ZooKeeper API, addressing many of the very low-level details of the ZooKeeper client development.
- It provides abstract encapsulation of Various zooKeeper application scenarios (such as distributed lock service, cluster leader election, shared counters, cache mechanism, distributed queue, etc.) and implements Fluent style API interface, which is the most useful and popular zooKeeper client.
1.1 deficiencies of the native zookeeperAPI
- Connection objects are created asynchronously, requiring developers to code their own waits
- Connections do not have an automatic reconnect timeout mechanism
- Watcher is registered once
- Recursive creation of tree nodes is not supported
1.2 curator characteristics
- Resolve session session timeout reconnection
- Watcher signed up repeatedly
- Simplified development apis
- Follow the Fluent style API
- Distributed lock service, shared counters, caching mechanism and other mechanisms are provided
1.3 depend on
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<! -- Encapsulates advanced features such as Cache event listening, elections, distributed locks, and distributed barriers.
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<type>jar</type>
</dependency>
</dependencies>
Copy the code
2. Connect and close
- Factory design pattern and builder design pattern are adopted. You can obtain a client that connects to the Zookeeper server by entering some connection information.
public static void main(String[] args) { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000.3); // Indicates an interval of 1s. Reconnection can be attempted for a maximum of three times CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.233.133:2181192168 233.131:2181192168 233.132:2181") .sessionTimeoutMs(5000) .retryPolicy(retryPolicy) .namespace("create") .build(); client.start(); // Start the client log.info(client.isStarted()); client.close(); // Close the client } Copy the code
ConnectString: Used to set the address and port number.
sessionTimeoutMs
: Used to set the timeout period.retryPolicy
: Used to set the reconnection policynamespace
: indicates the path of the root node
2.1 Test template
- Therefore, you can write a test template that opens the client before you start and closes it after you finish.
public class CreateTest { private final static Logger log = Logger.getLogger(ConnectTest.class); private String connectString = "192.168.233.133:2181192168 233.131:2181192168 233.132:2181"; CuratorFramework client; Integer sessionTimeoutMs = 5000; Integer baseSleepTimeMs = 1000; Integer maxRetries = 3; String namespace = "create"; @Before public void before(a) { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory .builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .retryPolicy(retryPolicy) .namespace(namespace) .build(); client.start(); log.info("Client is open"); } @After public void after(a) { client.close(); log.info("Client closed"); }}Copy the code
3. Add a node
3.1 Case 1: Simple Creation
@Test
public void testCreate(a) throws Exception {
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node"."data".getBytes());
log.info(End of the "create");
}
Copy the code
3.2 Case 2: Creating Custom Permissions
@Test
public void testCreate2(a) throws Exception {
Id ip = new Id("ip"."192.168.233.133");
List<ACL> acl = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip));
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(acl)
.forPath("/node1"."data".getBytes());
log.info(End of the "create");
}
Copy the code
3.3 Case 3: Recursive Node Creation
.creatingParentsIfNeeded()
Implementation, you can create nodes recursively@Test public void testCreate3(a) throws Exception { // Create nodes recursively client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node2/node33"."data".getBytes()); log.info(End of the "create"); } Copy the code
3.4 Case 4: Creating a Node asynchronously
- The first argument received by the method is explained here
curatorFramework
It’s actually the client;curatorFramework
The results of some queries are saved.@Test public void testCreate4(a) throws Exception { // Create nodes asynchronously client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { log.info(curatorFramework == client); // true log.info("getResultCode(): " + curatorEvent.getResultCode()); // 0 indicates that the vm is created successfully log.info("getType(): " + curatorEvent.getType().toString()); // Get the operation type CREATE log.info("getPath(): " + curatorEvent.getPath()); // Get the node path } }) .forPath("/node2/node38"."data".getBytes()); log.info(End of the "create"); } Copy the code
4. Update nodes
4.1 Case 1: Updating a node
@Test
public void testSet(a) throws Exception {
client.setData()
.forPath("/node"."set".getBytes());
log.info("Setup complete");
}
Copy the code
4.2 Case 2: Updating a Node with the Version
@Test
public void testSet2(a) throws Exception {
client.setData()
.withVersion(1) // With a version number
.forPath("/node"."12".getBytes());
log.info("Setup complete");
}
Copy the code
4.3 Case 3: Update a node with callback method
@Test
public void testSet3(a) throws Exception {
client.setData()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getResultCode()); / / 0
log.info(curatorEvent.getType()); // SET_DATA
log.info(curatorEvent.getPath()); // /node
log.info(curatorEvent.getStat().toString()); / /,4,0,0,0,3,0,21474836489 21474836489214483542162040876 12162042284 88
}
})
.forPath("/node"."432".getBytes());
log.info("Setup complete");
}
Copy the code
5. Delete the node
5.1 Case 1: Deleting a Node
@Test
public void testDelete(a) throws Exception {
client.delete()
.forPath("/node");
log.info("Deletion completed");
}
Copy the code
5.2 Case 2: Deleting nodes Recursively
@Test
public void testDelete1(a) throws Exception {
client.delete()
.deletingChildrenIfNeeded()
.forPath("/node2");
log.info("Deletion completed");
}
Copy the code
5.3 Case 3: Delete a node using the Callback method
@Test
public void testDelete3(a) throws Exception {
client.delete()
.deletingChildrenIfNeeded()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getType()); // DELETE
log.info(curatorEvent.getPath()); // /node1
}
})
.forPath("/node1");
log.info("Deletion completed");
}
Copy the code
6. View the node
6.1 Case 1: Viewing a Node
@Test
public void testGet(a) throws Exception {
byte[] data = client.getData()
.forPath("/node2");
log.info(new String(data));
}
Copy the code
6.2 Case 2: Viewing the Value and status of a Node
@Test
public void testGet2(a) throws Exception {
Stat stat = new Stat();
byte[] data = client.getData()
.storingStatIn(stat)
.forPath("/node2");
log.info(new String(data));
log.info(stat.getVersion());
}
Copy the code
6.3 Case 3: Check a node using the Callback method
@Test
public void testGet3(a) throws Exception {
client.getData()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(new String(curatorEvent.getData())); / / 4134134
log.info(curatorEvent.getStat().toString()); / /,0,0,0,0,7,0,21474836566 21474836566214483566162042639 98162042639 98
log.info(curatorEvent.getType().toString()); // GET_DATA
}
})
.forPath("/node2");
}
Copy the code
7. View child nodes
7.1 Case 1: Viewing all child nodes of a node
@Test
public void testChildren(a) throws Exception {
List<String> children = client.getChildren()
.forPath("/");
log.info(children.toString());
}
Copy the code
7.2 Case 2: View all child nodes of a node using the callback method
@Test
public void testChildren2(a) throws Exception {
client.getChildren()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getPath()); // /
log.info(curatorEvent.getType().toString()); // CHILDREN
log.info(curatorEvent.getChildren().toString()); // [node, node2, node3]
}
})
.forPath("/");
}
Copy the code
8. Check whether the node exists
8.1 Case 1: Checking whether a node exists
@Test
public void testExists(a) throws Exception {
Stat stat = client.checkExists()
.forPath("/node");
if(stat ! =null)
log.info(stat.toString());
else
log.info("Node does not exist");
}
Copy the code
8.2 Case 2: The callback method is used to check whether a node exists
@Test
public void testExists1(a) throws Exception {
client.checkExists()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getType().toString()); // EXISTS
Stat stat = curatorEvent.getStat();
if(stat ! =null)
log.info(stat.toString()); / /,0,0,0,0,0,0,21474836548 21474836548214483548162042341 64162042341 64
else
log.info("Node does not exist");
}
})
.forPath("/node");
}
Copy the code
9. Watcher
- A curator provides two types of Watcher(Cache) to listen for node changes
NodeCache
: only listens for a specific nodeAdd, modify, and delete data. (The addition, deletion and modification of child nodes are not managed)PathChildrenCache
: Monitors a ZNode child node. When a child nodeAdd, modify, delete dataPathCache changes its state to include the latest child node, its data, and state- This monitor can be used multiple times
9.1 Case 1: NodeCache
@Test
public void testWatch(a) throws Exception {
// Observe the change of the node
NodeCache nodeCache = new NodeCache(client, "/node22");
nodeCache.start();
nodeCache.getListenable()
.addListener(new NodeCacheListener() {
@Override
public void nodeChanged(a) throws Exception {
ChildData currentData = nodeCache.getCurrentData();
if(currentData ! =null) {
log.info(currentData.getPath());
log.info(new String(currentData.getData()));
} else {
log.info("A node was deleted."); }}}); Thread.sleep(60000); / / sleep 30 s
nodeCache.close();
}
Copy the code
9.2 Case 2: PathChildrenCache
@Test
public void testWatch2(a) throws Exception {
// Observe the change of the node
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node22".true);
pathChildrenCache.start();
pathChildrenCache.getListenable()
.addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
log.info(pathChildrenCacheEvent.getType()); // CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED
log.info(pathChildrenCacheEvent.getData().toString()); / / ChildData {path = '/ node22 / child, stat 59,0,0,0,0,2,0,21474836630 = 21474836630214483630162044842, 59162044842, data=[50, 50]}
log.info(new String(pathChildrenCacheEvent.getData().getData()));
log.info(pathChildrenCacheEvent.getData().getPath()); // ChildData{path='/node22/child'
log.info(pathChildrenCacheEvent.getData().getStat().toString()); / /,0,0,0,0,2,0,21474836630 of 59, 21474836630214483630162044842, 59162044842}}); Thread.sleep(60000); / / sleep 30 s
pathChildrenCache.close();
}
Copy the code
The transaction was
10.1 Case 1: Creating two nodes using a transaction
@Test
public void testTransaction(a) throws Exception {
client.inTransaction()
.create().forPath("/node100"."100".getBytes())
.and() / / bridge
.create().forPath("/node101"."101".getBytes())
.and() / / bridge
.commit(); / / submit
log.info("Submission successful");
}
Copy the code
11. Distributed locks
11.1 Using distributed reentrant exclusive locks
- An exclusive lock means that everyone is competing for the same lock node
/lock
When requested, will be/lock
Add an internal sequence node that can continue execution when it is its turn; Otherwise it blocks. When the lock is released, the sequence node added by the lock is deleted. (Basic implementation principle andA distributed lockBasically the same)@Test public void testMutex(a) throws Exception { / / exclusive lock InterProcessLock lock = new InterProcessMutex(client, "/lock"); log.info("Waiting to acquire lock object"); lock.acquire(); for (int i = 0; i < 3; i++) { Thread.sleep(3000); System.out.println(i); } lock.release(); log.info("Release lock"); } Copy the code
11.2 Using read/write Locks
- Read and write locks are two different types of locks, but something interesting can happen when both compete for the same lock node.
- When a read lock is entered, other read locks can also be entered. But write locks can only wait outside;
- When the write lock is entered, neither the read nor write lock can enter.
/** * Write locks are not allowed to work while blocking. * When a read lock is running, another read lock is allowed to read data * When a write lock is running, no other read/write locks are allowed to enter *@throws Exception */ @Test public void testReadLock(a) throws Exception { InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock"); InterProcessLock readLock = interProcessReadWriteLock.readLock(); log.info("Waiting to acquire read lock object"); readLock.acquire(); for (int i = 0; i < 10; i++) { Thread.sleep(3000); System.out.println(i); } readLock.release(); log.info("Release lock"); } Copy the code
@Test public void testWriteLock(a) throws Exception { InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock"); InterProcessLock writeLock = interProcessReadWriteLock.writeLock(); log.info("Waiting to acquire write lock object"); writeLock.acquire(); for (int i = 0; i < 10; i++) { Thread.sleep(3000); System.out.println(i); } writeLock.release(); log.info("Release lock"); } Copy the code