conclusion

When using a curator to access ZooKeeper, there are several states. Note: the following T refers to the session timeout after negotiation with the ZK server.

  • SUSPENDED: If the heartbeat between the client and a ZK server is not satisfied for a long time, the client enters the SUSPENDED state. If 2T /3 does not receive any information from the server, it enters the SUSPENDED state and reconnects to other servers.
  • LOST: When the client initiates the reconnection logic due to the SUSPENDED state and successfully connects to the server, the server finds that the last session has expired, it will change the state to LOST and notify the client.
  • RECONNECTED: the zK server notifies the LOST client, reestablishes the session, notifies the client, and reestablishes the connection. Of course, if the client only enters the SUSPENDED state after 2t/3, and then successfully connects to other servers, zK Cluster does not consider the session expired, then there will be no LOST state, but only RECONNECTED.

There are other differences between LOST and RECONNECTED besides whether the session is expired? Or what other issues might session expiration trigger? For example, after the session expires, does the TreeCache need to be reinitialized? The session expired and the INITIALIZED event was indeed triggered after reconnection. If data changes occur on ZK before reconnection, after reconnection, the CONNECTION_RECONNECTED event, NODE_ADDED and NODE_REMOVED events, and the INITIALIZED event are pushed. The INITIALIZED event is emitted even if there are no data changes.

What if I just RECONNECTED, but the session doesn’t expire? There is no INITIALIZED event, regardless of whether or not the data has changed before reconnection. But if there is a data change, it can be detected.

In other words, for the Cache, the INITIALIZED event is triggered only after the session is rebuilt. However, whether the session is rebuilt or not, the data change event is triggered.

If the session has not expired, then the session is only transparently transferred. This session is still used to re-establish connections. So the question is, how does the server know if this session has expired or not?

Session expiration is determined by the server, not the client. The server notifies the client that the session expires.

If the session timeout is t, the server will assume that the session has expired if it receives no information from the client after t. For the client, if it does not receive any information from the server within T /3, it will send heartbeat information to the server. If after 2T /3 it still does not receive any information from the server, it will try another server, which has t/3 time to find.

The client will send heartbeat after the time between zK server and the last message sending time exceeds sessionTimeout*(2/3)/2. If the heartbeat fails, the client session timeout logic will be entered after the time between ZK server and the last message receiving time exceeds sessionTimeout*(2/3). And reconnection.

The INITIALIZED state is triggered when all node information is INITIALIZED (via the NODE_ADDED event) for TreeCache. Data can be obtained from TreeCache only after the INITIALIZED state is changed. Otherwise, empty Pointers will occur.

For TreeCache, even in a SUSPENDED state, the cache does not fail and data can still be retrieved from it.

The test code

ZkPathChildrenCacheTest

Only child node changes can be detected, such as adding child nodes, deleting child nodes, and updating child node data. However, data changes of its own nodes cannot be detected. In the following example, data changes in /example/cache are not detected.

public class ZkPathChildrenCacheTest {
    private static final String PATH = "/example/cache";

    private static PathChildrenCache cache = null;

    public static void main(String[] args) throws Exception {
        CuratorFramework    client = null;

        String zkServers = "172.16.16.47:2182,172.16.16.48:2182,172.16.16.49:2182";
        try {

            client = CuratorFrameworkFactory.newClient(zkServers,
                    new ExponentialBackoffRetry(1000.3));
            client.start();

            // in this example we will cache data. Notice that this is optional.
            cache = new PathChildrenCache(client, PATH, true);
            cache.start();

            processCommands(client, cache);
        } finally{ CloseableUtils.closeQuietly(cache); CloseableUtils.closeQuietly(client); }}private static void addListener(PathChildrenCache cache)
    {
        // a PathChildrenCacheListener is optional. Here, it's used just to log changes
        PathChildrenCacheListener listener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
            {
                System.out.println("hello.....");
                switch ( event.getType() )
                {
                    case CHILD_ADDED:
                    {
                        System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()) +
                                " ,data:" + new String(event.getData().getData()));
                        break;
                    }

                    case CHILD_UPDATED:
                    {
                        System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()) +
                                " ,data:" + new String(event.getData().getData()));
                        break;
                    }

                    case CHILD_REMOVED:
                    {
                        System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
                        break; }}}}; cache.getListenable().addListener(listener); }private static void processCommands(CuratorFramework client, PathChildrenCache cache) throws Exception {
        // More scaffolding that does a simple command line processor

        printHelp();

        try {
            addListener(cache);

            BufferedReader  in = new BufferedReader(new InputStreamReader(System.in));
            boolean         done = false;
            while ( !done ) {
                System.out.print(">");

                String      line = in.readLine();
                if ( line == null ) {
                    break;
                }

                String      command = line.trim();
                String[]    parts = command.split("\\s");
                if ( parts.length == 0 ) {
                    continue;
                }
                String      operation = parts[0];
                String      args[] = Arrays.copyOfRange(parts, 1, parts.length);

                if ( operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?") ) {
                    printHelp();
                }
                else if ( operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit") ) {
                    done = true;
                }
                else if ( operation.equals("set") ) {
                    setValue(client, command, args);
                }
                else if ( operation.equals("remove") ) {
                    remove(client, command, args);
                }
                else if ( operation.equals("list") ) {
                    list(cache);
                }

                Thread.sleep(1000); // just to allow the console output to catch up}}catch(Throwable throwable) { throwable.printStackTrace(); }}private static void list(PathChildrenCache cache)
    {
        if ( cache.getCurrentData().size() == 0 ) {
            System.out.println("* empty *");
        }
        else {
            for ( ChildData data : cache.getCurrentData() )
            {
                System.out.println(data.getPath() + "=" + newString(data.getData())); }}}private static void remove(CuratorFramework client, String command, String[] args) throws Exception
    {
        if( args.length ! =1 ) {
            System.err.println("syntax error (expected remove <path>): " + command);
            return;
        }

        String      name = args[0];
        if ( name.contains("/") ) {
            System.err.println("Invalid node name" + name);
            return;
        }
        String      path = ZKPaths.makePath(PATH, name);

        try {
            client.delete().forPath(path);
        } catch ( KeeperException.NoNodeException e ) {
            // ignore}}private static void setValue(CuratorFramework client, String command, String[] args) throws Exception
    {
        if( args.length ! =2 ) {
            System.err.println("syntax error (expected set <path> <value>): " + command);
            return;
        }

        String      name = args[0];
        if ( name.contains("/") ) {
            System.err.println("Invalid node name" + name);
            return;
        }
        String      path = ZKPaths.makePath(PATH, name);

        byte[]      bytes = args[1].getBytes();
        try {
            client.setData().forPath(path, bytes);
        } catch( KeeperException.NoNodeException e ) { client.create().creatingParentContainersIfNeeded().forPath(path, bytes); }}private static void printHelp(a)
    {
        System.out.println("An example of using PathChildrenCache. This example is driven by entering commands at the prompt:\n");
        System.out.println("set <name> <value>: Adds or updates a node with the given name");
        System.out.println("remove <name>: Deletes the node with the given name");
        System.out.println("list: List the nodes/values in the cache");
        System.out.println("quit: Quit the example"); System.out.println(); }}Copy the code

ZkNodeCacheTest

Only data changes of its own nodes can be detected. If the node itself is deleted, it can also be detected, but it needs to be noted that the callback method is written to prevent null Pointers. The creation of its own node can also be found.

public class ZkNodeCacheTest {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        String zkServers = "172.16.16.47:2182,172.16.16.48:2182,172.16.16.49:2182";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServers)
                .retryPolicy(new ExponentialBackoffRetry(1000.3)).build();
        client.start();

        NodeCache nodeCache = new NodeCache(client, PATH);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged(a) throws Exception {
                System.out.println("node cache changed, path: " + nodeCache.getCurrentData().getPath()
                        + " ,data: " + newString(nodeCache.getCurrentData().getData())); }}); nodeCache.start(); System.in.read(); }}Copy the code

ZkTreeCacheTest

It can not only discover the data changes of its own nodes, but also discover the creation, data update and deletion of child nodes.

public class ZkTreeCacheTest {
    private static final String PATH = "/example/cache";

    private static CountDownLatch LATCH = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        String zkServer = "172.16.16.47:2182,172.16.16.48:2182,172.16.16.49:2182";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServer)
                .retryPolicy(new ExponentialBackoffRetry(1000.3))
                .build();
        client.start();

        TreeCache treeCache = new TreeCache(client, PATH);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {

                System.out.println("data: " + new String(treeCache.getCurrentData(PATH).getData()));
                Map<String, ChildData> map = treeCache.getCurrentChildren(PATH);
                for (Map.Entry<String, ChildData> entry : map.entrySet()) {
                    String nodeName = entry.getKey();
                    System.out.println("child, key: " + entry.getKey() +
                            " .child, path: " + entry.getValue().getPath() +
                            " , data: " + new String(entry.getValue().getData()));
                    Map<String, ChildData> children = treeCache.getCurrentChildren(ZKPaths.makePath(PATH, nodeName));
                    for (Map.Entry<String, ChildData> childDataEntry : children.entrySet()) {
                        System.out.println("children: key: " + childDataEntry.getKey() +
                                " .child, path: " + childDataEntry.getValue().getPath() +
                                " , data: " + newString(childDataEntry.getValue().getData())); }}switch (event.getType()) {
                    case INITIALIZED:
                        LATCH.countDown();
                        System.out.println("INITIALIZED");
                        break;
                    case NODE_ADDED:
                        System.out.println("NODE_ADDED, path: " + event.getData().getPath() + " ,data: " + new String(event.getData().getData()));
                        break;
                    case NODE_UPDATED:
                        System.out.println("NODE_UPDATED, path: " + event.getData().getPath() + " ,data: " + new String(event.getData().getData()));
                        break;
                    case NODE_REMOVED:
                        System.out.println("NODE_REMOVED, path: " + event.getData().getPath() + " ,data: " + new String(event.getData().getData()));
                        break;
                    case CONNECTION_RECONNECTED:
                        System.out.println("CONNECTION_RECONNECTED");
                        break;
                    case CONNECTION_LOST:
// LATCH = new CountDownLatch(1);
                        System.out.println("CONNECTION_LOST");
                        break;
                    case CONNECTION_SUSPENDED:
                        System.out.println("CONNECTION_SUSPENDED");
                        break;
                    default:
                        System.out.println("default");
                        break; }}}); treeCache.start(); LATCH.await(); System.out.println("xxxxx, /example/cache/test/test-child2: data, " + new String(treeCache.getCurrentData("/example/cache").getData()));

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run(a) {
                while (true) {
                    try {
                        Map<String, ChildData> map = treeCache.getCurrentChildren("/example/cache/test");
                        for (Map.Entry<String, ChildData> entry : map.entrySet()) {
                            System.out.println("XXXXXXXXXXXXXXX");
                            System.out.println(entry.getKey());
                            System.out.println(entry.getValue().getPath());
                            System.out.println(newString(entry.getValue().getData())); }}catch (Exception e) {
                        e.printStackTrace();
                    }

                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch(InterruptedException e) { e.printStackTrace(); }}}}); thread.start(); Map<String, ChildData> map = treeCache.getCurrentChildren("/not/exist");
        System.out.println("FFFFFFFFFFFFFFFFFF, /not/exist, children: " + (MapUtils.isEmpty(map) ? "NULL" : map.keySet()));
        if (map == null) {
            / / output
            System.out.println("NNNNNNNNNNNNNuLL"); } System.in.read(); }}Copy the code