What is the Watcher mechanic

The Watcher mechanism of Zookeeper is a very core mechanism. The core functions provided by Zookeeper, such as publish/subscribe, monitor node changes (such as node deletion, content change, or state change of child nodes), are realized based on the Watcher mechanism

The Implementation of watcher is essentially an observer model, but it’s distributed rather than standalone.

The Watcher mechanism involves listening for state, which consists of two states: the connection state between the client and server (notification state) and the state of the node (event type)

Notification state (KeeperState)

KeeperState described is the connection between the client and the server state changes some notification type, in the Java client of ZK, have an enumerated specially kept the state: org. Apache. The zookeeper. Watcher. Event. KeeperState

Some of the main attributes are as follows:

Enumeration property name describe
DisConnected CS is not connected
SyncConnected Normal connection status
Expired The session timed out. When a client connects to a server, the server assigns it an occupancy period, and if it has not “renewed” it will enter the Expired state
NoSyncConnected Attribute timeout
AuthFailed Authentication failed and the server rejected the connection. Procedure
ConnectedReadOnly This is the mode that zookeper has only provided since version 3.3. If the client allows ReadOnly, if more than half of the cluster is abnormal, the entire service cannot be provided. If you set ReadOnly, the client can be read-only even if a half-exception occurs

EventType

This is actually more useful because it describes state changes of Znode nodes to perform some publish/subscribe functions. Similarly, in the Java client, there is also an enumeration corresponding: org. Apache. Zookeeper. Watcher. Event. The EventType

Enumeration property name describe
NodeCreated The node Watcher listens on is created
NodeDeleted The node Watcher is listening on is deleted
NodeDataChanged The value of the node monitored by Watcher is changed
NodeChildrenChanged Watcher listens for state changes of the child nodes of the node

EventType client implementation of registration and notification

It’s essentially three HashMaps, which you can see in the code

// These Map keys are node paths, similar to /a,/b; Value is all watches corresponding to this node

// Node content listener
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();

// Listen on behalf of a node state change (creation or destruction)
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();

// The child node status of the node is monitored
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
Copy the code

EventType server implementation of registration and notification

Also, there are two maps associated with it, directly pasting the code

/* * Where key represents the node path and value represents the set of client connections *. When a node changes, the watcher can be directly obtained and notified one by one */
private final HashMap<String, HashSet<Watcher>> watchTable =
    new HashMap<String, HashSet<Watcher>>();

/** * key represents a client watcher, and value represents all node paths it listens to. Simply, when a client is disconnected, some of the listeners associated with it are removed. That is, the watcher * is removed from the set of the path in the first map, so the map is able to quickly locate all the path */ corresponding to the watcher
private final HashMap<Watcher, HashSet<String>> watch2Paths =
    new HashMap<Watcher, HashSet<String>>();
Copy the code

It is important to note that the server watcher is not the watcher in the code, but a ServerCnxn abstract class that implements the Watcher interface.

EventType Registration and notification process

When the client invokes the corresponding API (such as getData) to initiate the registration event, the client will encapsulate the request into a Packet object and then add it to the outGoingQueue for sending. It should be noted that the corresponding worker has not been maintained into the HashMap at this time, and this step should be carried out after the service is installed with callback

When the server receives the object, it registers the Watcher, updating the two HashMaps, and responds to the client.

When the client receives the response, it registers the corresponding Watcher with its HashMap. The registration is complete

The notification process is also very simple. The server modiates the corresponding node, then removes its watchTable to find all watcher and notifies them one by one. The process function of watcher on the client side will be called to complete a notification mechanism

Note that event notifications in ZooKeeper are one-off, meaning that the watcher is removed after a notification is made by the server (mainly for performance reasons).

Features of the Watcher mechanism in ZooKeeper

  • Once triggered, watcher is removed by ZK, either on the client or server side, so it needs to be added manually after each callback if you want to keep listening
  • The order. After the client encapsulates the received requests as Packet objects, it adds them to the FIFO queue of an outgoingQueue and sends them in the first-come-first-served order to ensure the sequence
  • Lightweight: For the server, it notifies the client, not the content of the event (such as how the specific node content is modified), it only tells the client that the event occurred; As for the client, it doesn’t pass the entire Watcher object, just use a Boolean to check whether you need to listen

Simple code example

Talk is cheap

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/ * * * Zookeeper Wathcher * this class is a Watcher class (realized org. Apache. The Zookeeper. Watcher) *@author* (alienware)@sinceThe 2015-6-14 * /
public class ZooKeeperWatcher implements Watcher {
    /** Defines the session expiration time */
    private static final int SESSION_TIMEOUT = 5000;
    /** ZooKeeper server address */
    private static final String CONNECTION_ADDR = "ip1:port1,ip2:port2,ip3:port3";
    /** zk parent path set */
    private static final String PARENT_PATH = "/a";
    /** zk subpath set */
    private static final String CHILDREN_PATH = "/b/c";
    /** zk variable */
    private ZooKeeper zk = null;

    /** * Create ZK connection *@paramConnectAddr ZK server ADDRESS list *@paramSessionTimeout Session timeout */
    public void createConnection(String connectAddr, int sessionTimeout) {
        this.releaseConnection();
        try {
            // This means to pass the current object to it (i.e. the new ZooKeeperWatcher() instance object instantiated in the main function)
            zk = new ZooKeeper(connectAddr, sessionTimeout, this);
            System.out.println(LOG_PREFIX_OF_MAIN + "Start connecting to ZK server");
        } catch(Exception e) { e.printStackTrace(); }}/** * Close the ZK connection */
    public void releaseConnection(a) {
        if (this.zk ! =null) {
            try {
                this.zk.close();
            } catch(InterruptedException e) { e.printStackTrace(); }}}/** * Handle the Watcher notification received from the Server. * /
    @Override
    public void process(WatchedEvent event) {
        
        System.out.println("Enter the process method");
        
        if (event == null) {
            return;
        }
        
        // Get the connection status
        KeeperState keeperState = event.getState();
        // Event type
        EventType eventType = event.getType();
        // Affected path
        String path = event.getPath();
        System.out.println("Connection Status :\t" + keeperState.toString());
        System.out.println("Event Type :\t" + eventType.toString());

        if (KeeperState.SyncConnected == keeperState) {
            // Successfully connected to the ZK server
            if (EventType.None == eventType) {
                System.out.println( "Successfully connected to ZK server");
                connectedSemaphore.countDown();
            } 
            // Create a node
            else if (EventType.NodeCreated == eventType) {
                System.out.println("Node Creation");
                try {
                    Thread.sleep(100);
                } catch(InterruptedException e) { e.printStackTrace(); }}// Update the node
            else if (EventType.NodeDataChanged == eventType) {
                System.out.println("Node Data Update");
                try {
                    Thread.sleep(100);
                } catch(InterruptedException e) { e.printStackTrace(); }}// Update the child node
            else if (EventType.NodeChildrenChanged == eventType) {
                System.out.println("Child node change");
                try {
                    Thread.sleep(3000);
                } catch(InterruptedException e) { e.printStackTrace(); }}// Delete a node
            else if (EventType.NodeDeleted == eventType) {
                System.out.println("Node" + path + "Deleted");
            }
            else ;
        } 
        else if (KeeperState.Disconnected == keeperState) {
            System.out.println("Disconnected from ZK server");
        } 
        else if (KeeperState.AuthFailed == keeperState) {
            System.out.println("Permission check failed");
        } 
        else if (KeeperState.Expired == keeperState) {
            System.out.println("Invalid session"); }}}Copy the code