This is the 19th day of my participation in the Gwen Challenge in November. Check out the details: The last Gwen Challenge in 2021

Zookeeper profile

ZooKeeper is a distributed, open source distributed application coordination service. It is an open source implementation of Google’s Chubby and an important component of Hadoop and Hbase. It provides consistency services for distributed applications, including configuration and maintenance, domain name service, distributed synchronization, and group service.

ZooKeeper aims to encapsulate key services that are complex and error-prone, and provide users with easy-to-use interfaces, efficient performance, and stable functions.

ZooKeeper contains a simple set of primitives that provide Java and C interfaces.

The ZooKeeper code version, which provides an interface for distributed exclusive locks, elections, and queues, is in Zookeeper-3.4.3 SRC \recipes. Distribution locks and queues have Java and C versions, and elections have only Java versions.

The above content from “baidu encyclopedia”, details about https:/zookeeper.apache.org/ can go to the website

The ZooKeeper cluster is created

On the single node, single server multi-node pseudo cluster, cluster, online related information is a lot, no further details.

Curator profile

The native API provided by ZooKeeper is too cumbersome to operate. The Curator framework encapsulates the native API provided by ZooKeeper and provides a more advanced API interface, making zooKeeper easier and more efficient for client programmers to use.

More information about CuraoTR can be found at curator.apache.org/

Native API for ZooKeeper listening

The native JAVA API of ZooKeeper provides the Watcher interface for listening. Implement this interface to override the process(WatchedEvent WatchedEvent) method.

Disadvantages: This listener can only be listened on once. If it is triggered once, it becomes invalid. If it needs to be used repeatedly, it needs to be registered once every time.

The test code and description are as follows:

public class ZkWatcherTest {

    private static final String ADDR = "192.168.100.1:2181";
    private static final String PATH = "/test1";
    private static ZooKeeper zk = null;
    static {
        try {
            zk = new ZooKeeper(ADDR, 3000.new WatcherTest());
        } catch(IOException e) { e.printStackTrace(); }}public static void main(String[] args) throws InterruptedException {
        test1();
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void test1(a){
        try {
            KeeperErrorCode = NoNode; /** * Set event trigger (modify node value) * get not trigger * Delete current node trigger, if it is synchronization will throw exception (KeeperErrorCode = NoNode), Re-create/delete no longer triggered * Node creation no triggered * Synchronous connection,exists event is not triggered, asynchronous is not tested */
            zk.getData(PATH, true.null);
            /** * SET event trigger (modify node value) * GET not trigger * CRUD not trigger * Delete current node trigger, no exception thrown * Synchronous connection,exists event not trigger, asynchronous not test * create node trigger */
            zk.exists(PATH, true);
            /** * drop node trigger, throw exception * set not trigger (change node value) * get not trigger * drop node create Operation on Level-1 node Is not triggered * Creation of Level-1 node is triggered * Deletion of Level-1 Node is triggered * Changing the value of Level-1 node is not triggered * Synchronous connection,exists event is not triggered, asynchronism is not tested */
            zk.getChildren(PATH, true);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch(InterruptedException e) { e.printStackTrace(); }}private static class WatcherTest implements Watcher{
        public void process(WatchedEvent watchedEvent) {
            System.out.println("wathceEvent: " + watchedEvent);
            try {
              // Register the default watcher when the callback method is invalid, if the second argument below is true
                zk.getChildren(PATH, true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch(InterruptedException e) { e.printStackTrace(); }}}}public class ZkWatcherTest {

    private static final String ADDR = "192.168.100.1:2181";
    private static final String PATH = "/test1";
    private static ZooKeeper zk = null;
    static {
        try {
            zk = new ZooKeeper(ADDR, 3000.new WatcherTest());
        } catch(IOException e) { e.printStackTrace(); }}public static void main(String[] args) throws InterruptedException {
        test1();
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void test1(a){
        try {
            KeeperErrorCode = NoNode; /** * Set event trigger (modify node value) * get not trigger * Delete current node trigger, if it is synchronization will throw exception (KeeperErrorCode = NoNode), Re-create/delete no longer triggered * Node creation no triggered * Synchronous connection,exists event is not triggered, asynchronous is not tested */
            zk.getData(PATH, true.null);
            /** * SET event trigger (modify node value) * GET not trigger * CRUD not trigger * Delete current node trigger, no exception thrown * Synchronous connection,exists event not trigger, asynchronous not test * create node trigger */
            zk.exists(PATH, true);
            /** * drop node trigger, throw exception * set not trigger (change node value) * get not trigger * drop node create Operation on Level-1 node Is not triggered * Creation of Level-1 node is triggered * Deletion of Level-1 Node is triggered * Changing the value of Level-1 node is not triggered * Synchronous connection,exists event is not triggered, asynchronism is not tested */
            zk.getChildren(PATH, true);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch(InterruptedException e) { e.printStackTrace(); }}private static class WatcherTest implements Watcher{
        public void process(WatchedEvent watchedEvent) {
            System.out.println("wathceEvent: " + watchedEvent);
            try {
              // Register the default watcher when the callback method is invalid, if the second argument below is true
                zk.getChildren(PATH, true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch(InterruptedException e) { e.printStackTrace(); }}}}Copy the code

For the native JAVA API provided by Zookeeper, a Watcher parameter is passed to initialize the client instance. This parameter can be null. This registers a default Watcher, which is invalidated after the first call. If the second parameter of getData, exists, and getChildren is set to true, watcher can be registered again. See the comments for each interface that registers the event states watcher can listen for and the types of events that trigger watcher.

The advanced listening API recommended by the Curator official

The API officially recommended by Curator is an encapsulation of zooKeeper’s native JAVA API, which can handle duplicate registration and event information well. And the listening event returns detailed information, including the changing node path, node value, and so on, which is missing from the native API.

This listening for events is similar to a comparison between a local cache view and a remote Zookeeper view.

Note: streaming API is used for method invocation of curator. The advantages and cautions of this style can be found in the literature.

The official recommended API provides three interfaces, as follows:

  • NodeCache

Listen on a node. The listening events include adding, deleting, or modifying a specified path

  • PathChildrenCache

Listens for the level 1 subdirectory of the specified path node. Does not listen for operations on this node, but listens for add, delete, or modify operations on its subdirectories

  • TreeCache

NodeCache and PathChildrenCahce listen to the entire directory. You can set the listening depth.

The usage, precautions, and some parameters of these three apis are shown in the following test code: The content is trivial and will not be summarized in detail.

public class ZKCurator {
   
    private static final String ADDR = "192.168.100.1:2181";
    private static final String PATH = "/zktest1";

    public static void main(String[] args) throws InterruptedException {
        final CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ADDR, new RetryNTimes(10.5000));
        zkClient.start();
        System.out.println("start zkclient...");
        Thread thread = null;

        try {
            registerWatcher(zkClient);
            //registerNodeCache(zkClient);
        } catch (Exception e) {
            e.printStackTrace();

        }

        System.out.println("register wathcer end...");
        Thread.sleep(Integer.MAX_VALUE);
        zkClient.close();
    }

    private static void registerWatcher(CuratorFramework zkClient) throws Exception {
        /** * Register listener, the current node does not exist, create the node: No exceptions and errors log * Register child node trigger type=[CHILD_ADDED] * Update Trigger type=[CHILD_UPDATED] * * ZK suspend type=CONNECTION_SUSPENDED,,, Zk: type=CONNECTION_RECONNECTED, data= NULL * Updates the child node: type=CHILD_UPDATED, data=ChildData{path='/zktest111/tt1', The stat = 4294979983429979 993150037752 36150037338 05,2,0,0,0,6,0,4294979983, data = [55, 55, 55, 55, 55, 55]} * Delete child node type=CHILD_REMOVED * Update root node: not triggered * Delete root node: not triggered No exception * Create root node: not triggered * Create and update child node: not triggered * * Failed to connect to ZK during restart */
        ExecutorService service = Executors.newFixedThreadPool(3);
        PathChildrenCache watcher = new PathChildrenCache(zkClient, PATH, true/*,false, service*/);
        watcher.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println(pathChildrenCacheEvent); }});/ * PathChildrenCache POST_INITIALIZED_EVENT StartMode stated below * * 1, that is, when listeners start are enumerated the current path all child nodes, Events of type CHILD_ADDED are triggered * 2. An INITIALIZED event is listened * NORMAL asynchronous cache initialization * POST_INITIALIZED_EVENT Asynchronous cache initialization. Initialization is complete trigger PathChildrenCacheEvent. Type. The INITIALIZED / * NORMAL and POST_INITIALIZED_EVENT only 1 case, /*BUILD_INITIAL_CACHE /*BUILD_INITIAL_CACHE /*BUILD_INITIAL_CACHE /* *BUILD_INITIAL_CACHE
        watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        System.out.println("Registered watcher successful...");
    }

    public static void registerNodeCache(CuratorFramework client) throws Exception{
        /* * node path does not exist, create event trigger listener * node path exists, set trigger listener * node path exists, delete trigger listener * * * node hangs, did not trigger any listener * reconnect, No listening * nodes are reconnected. Restore listening * */
        final NodeCache nodeCache = new NodeCache(client, PATH, false);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged(a) throws Exception {
                System.out.println("Current node:"+nodeCache.getCurrentData()); }});// If true, node contents will not be cached in the cache for the first time. The default value is false
        nodeCache.start(true);
    }
  
  public static void registTreeCache(CuratorFramework client) throws Exception {
        /** * TreeCache. NodeState == LIVE; Treecache. nodeState == LIVE * * maxDepth specifies the value of the treecache. nodeState == LIVE * * maxDepth specifies the value of the maxDepth value. For example, if the current monitoring node is /t1 and the deepest directory is/T1 / T2 /t3/ T4, maxDepth=3. * maxDepth maximum value 2147483647 * * If the listener is enabled for the first time, the current node and all subdirectory nodes, The [type=NODE_ADDED] event is triggered. Add all nodes (at least equal to the maxDepth directory) * Default listening depth to the lowest level * Initialization ends with [type=INITIALIZED] * * [type=NODE_ADDED],set Range [current node, maxDepth directory node](closed range) * * * [type=NODE_ADDED] Added node range [current node, maxDepth directory node](left closed right closed range) * * [type=NODE_REMOVED] Range [Current node, TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/zktest1', 35,0,0,0,0,2,0,4294979373 stat = 4294979373429979, 373149850816, 35149850816, data = [116, 49]}} * * /
        final TreeCache treeCache = TreeCache.newBuilder(client, PATH).setCacheData(true).setMaxDepth(2).build();
        treeCache.getListenable().addListener(new TreeCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("treeCacheEvent: "+treeCacheEvent); }});// There is no open mode as an input methodtreeCache.start(); }}class ZKCurator {
   
    private static final String ADDR = "192.168.100.1:2181";
    private static final String PATH = "/zktest1";

    public static void main(String[] args) throws InterruptedException {
        final CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ADDR, new RetryNTimes(10.5000));
        zkClient.start();
        System.out.println("start zkclient...");
        Thread thread = null;

        try {
            registerWatcher(zkClient);
            //registerNodeCache(zkClient);
        } catch (Exception e) {
            e.printStackTrace();

        }

        System.out.println("register wathcer end...");
        Thread.sleep(Integer.MAX_VALUE);
        zkClient.close();
    }

    private static void registerWatcher(CuratorFramework zkClient) throws Exception {
        /** * Register listener, the current node does not exist, create the node: No exceptions and errors log * Register child node trigger type=[CHILD_ADDED] * Update Trigger type=[CHILD_UPDATED] * * ZK suspend type=CONNECTION_SUSPENDED,,, Zk: type=CONNECTION_RECONNECTED, data= NULL * Updates the child node: type=CHILD_UPDATED, data=ChildData{path='/zktest111/tt1', The stat = 4294979983429979 993150037752 36150037338 05,2,0,0,0,6,0,4294979983, data = [55, 55, 55, 55, 55, 55]} * Delete child node type=CHILD_REMOVED * Update root node: not triggered * Delete root node: not triggered No exception * Create root node: not triggered * Create and update child node: not triggered * * Failed to connect to ZK during restart */
        ExecutorService service = Executors.newFixedThreadPool(3);
        PathChildrenCache watcher = new PathChildrenCache(zkClient, PATH, true/*,false, service*/);
        watcher.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println(pathChildrenCacheEvent); }});/ * PathChildrenCache POST_INITIALIZED_EVENT StartMode stated below * * 1, that is, when listeners start are enumerated the current path all child nodes, Events of type CHILD_ADDED are triggered * 2. An INITIALIZED event is listened * NORMAL asynchronous cache initialization * POST_INITIALIZED_EVENT Asynchronous cache initialization. Initialization is complete trigger PathChildrenCacheEvent. Type. The INITIALIZED / * NORMAL and POST_INITIALIZED_EVENT only 1 case, /*BUILD_INITIAL_CACHE /*BUILD_INITIAL_CACHE /*BUILD_INITIAL_CACHE /* *BUILD_INITIAL_CACHE
        watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        System.out.println("Registered watcher successful...");
    }

    public static void registerNodeCache(CuratorFramework client) throws Exception{
        /* * node path does not exist, create event trigger listener * node path exists, set trigger listener * node path exists, delete trigger listener * * * node hangs, did not trigger any listener * reconnect, No listening * nodes are reconnected. Restore listening * */
        final NodeCache nodeCache = new NodeCache(client, PATH, false);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged(a) throws Exception {
                System.out.println("Current node:"+nodeCache.getCurrentData()); }});// If true, node contents will not be cached in the cache for the first time. The default value is false
        nodeCache.start(true);
    }
  
  public static void registTreeCache(CuratorFramework client) throws Exception {
        /** * TreeCache. NodeState == LIVE; Treecache. nodeState == LIVE * * maxDepth specifies the value of the treecache. nodeState == LIVE * * maxDepth specifies the value of the maxDepth value. For example, if the current monitoring node is /t1 and the deepest directory is/T1 / T2 /t3/ T4, maxDepth=3. * maxDepth maximum value 2147483647 * * If the listener is enabled for the first time, the current node and all subdirectory nodes, The [type=NODE_ADDED] event is triggered. Add all nodes (at least equal to the maxDepth directory) * Default listening depth to the lowest level * Initialization ends with [type=INITIALIZED] * * [type=NODE_ADDED],set Range [current node, maxDepth directory node](closed range) * * * [type=NODE_ADDED] Added node range [current node, maxDepth directory node](left closed right closed range) * * [type=NODE_REMOVED] Range [Current node, TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/zktest1', 35,0,0,0,0,2,0,4294979373 stat = 4294979373429979, 373149850816, 35149850816, data = [116, 49]}} * * /
        final TreeCache treeCache = TreeCache.newBuilder(client, PATH).setCacheData(true).setMaxDepth(2).build();
        treeCache.getListenable().addListener(new TreeCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("treeCacheEvent: "+treeCacheEvent); }});// There is no open mode as an input methodtreeCache.start(); }}Copy the code

Matters needing attention:

1. Merely encapsulates the listening event of the native Zookeeper, so that the client programmer can repeatedly register Watcher without order, but the one-time of Wathcer still exists, which is only completed by the Curator. Therefore, some scenarios still need to be used carefully. Therefore, even if it is abnormal operation to trigger Wathcer for the first time and register Watcher again, there is still delay between them. If several events are triggered for Zookeeper instantly, the listener cannot guarantee to monitor all state changes. How much you can listen depends on the speed of the server.

2. As long as the cache is started successfully and the listener is registered successfully, theoretically the listener can handle the event that needs to be monitored perfectly as long as there is no 1. However, if the connection to Zookeeper is interrupted during cache.start(), the client will not be aware of the changes that need to be monitored after the connection is recovered. One solution I came up with was to set a connection status listener when Zookeeper started (see section 7 for connection status listeners). If the Zookeeper client failed to connect, I would add this listener and restore the connection. Call cache.clearandRefresh () and remove the connection status listener.

However, this interface is only for The PathChildrenCache, because when the listener listens for node deletion, it is created again and will not listen again. The other two listeners may not be considered in this case, depending on the internal implementation of the listener.

Curator uses the ZooKeeper native listener

final CuratorFramework client = CuratorFrameworkFactory.builder()
                .retryPolicy(new RetryNTimes(0.1000))
                .connectionTimeoutMs(4000)
                .sessionTimeoutMs(40000)
                .connectString(ADDR)
                .defaultData(null)
                .build();

client.checkExists().usingWatcher(new Watcher() {
    public void process(WatchedEvent event) {}}); CuratorFramework client = CuratorFrameworkFactory.builder() .retryPolicy(new RetryNTimes(0.1000))
                .connectionTimeoutMs(4000)
                .sessionTimeoutMs(40000)
                .connectString(ADDR)
                .defaultData(null)
                .build();

client.checkExists().usingWatcher(new Watcher() {
    public void process(WatchedEvent event) {}});Copy the code

As shown in the code, this works in much the same way as the native API, except that the interface invocation style, listener usage, properties, and trigger events are the same as the native listener, because the parameters passed in are the Zookeeper native listener, and can also be the CuratorWathcer parameter.

In this case, the listener needs to implement repeated registration itself.

An API that uses a curator but does not use the advanced listeners it provides is appropriate for some particular business scenarios.

Other listeners

For example, connection status listeners:

final CuratorFramework client = CuratorFrameworkFactory.builder()
                .retryPolicy(new RetryNTimes(0.1000))
                .connectionTimeoutMs(4000)
                .sessionTimeoutMs(40000)
                .connectString(ADDR)
                .defaultData(null)
                .build();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
    public void stateChanged(CuratorFramework client, ConnectionState newState) {}}); CuratorFramework client = CuratorFrameworkFactory.builder() .retryPolicy(new RetryNTimes(0.1000))
                .connectionTimeoutMs(4000)
                .sessionTimeoutMs(40000)
                .connectString(ADDR)
                .defaultData(null)
                .build();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
    public void stateChanged(CuratorFramework client, ConnectionState newState) {}});Copy the code

ConnectionState Parameter description:

ConnectionState instructions
CONNECTED The first successful connection
SUSPENDED A connection is lost but has not yet timed out
RECONECTED After SUSPENDED, LOST, READ_ONLY and re-establish a connection
LOST When connection confirmation is lost
READ_ONLY Read-only mode. This mode is set during client initialization