This article will analyze the underlying principles of Zookeeper’s Watch mechanism from the source level, and explain in detail as much as possible. If there are mistakes or unclear places, welcome to correct, thank you!

In this article, ZK refers only to Zookeeper.

We know that they are temporary node can be used to implement distributed lock, multiple clients to create a node respectively, creating success is to get the lock, and create a failure of the client will monitor this temporary node, acquiring a lock of the client releases the lock (delete temporary node) or disconnected with ZK server after (ZK will remove the temporary node), The other clients will get a notification from Watch saying, hey guys, it’s released the lock, come grab the lock.

Above is an original distributed lock (not in production, the stampede effect), where we see the Watch mechanism being used to notify the client.

Stampede effect: When many processes are waiting for an event, they are awakened, but only one process can gain CPU execution rights, and the others are blocked, resulting in severe context switch costs. (From Wikipedia)

In some distributed scenario, we only need a machine to participate in normal distributed work (is actually to ZK server, create a temporary node this process similar to the simple version of a distributed lock described above, a machine successfully creates a temporary node was selected as the working machine, create the fall was listening the other node), after the abnormal machine downtime, Zookeeper uses the Watch mechanism to notify other machines to continue creating temporary nodes. If only one machine succeeds, the other machines will listen for the node. The machine that created the node will replace the machine that went down.

This is an example of using ZK to solve a single point problem in a distributed scenario.

From the previous description, it is not difficult to see that the Watch mechanism of ZK is a core and widely used function, so let’s analyze how the Watch mechanism is implemented inside ZK.

How is the Watch mechanism implemented?

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.1 track</version>
</dependency>
Copy the code

First you need to introduce POM dependencies (ZK’s native client API)

How do I use the Watch mechanism

ZooKeeperClass to establish a connection

This section will describe the process of establishing a connection between ZooKeeper class and the ZK server, which involves the initialization of the Watch object. The specific underlying principles will be explained in the following chapters.

ZooKeeper clients can use the Watch mechanism to subscribe to receive notifications when the data or status of a node on the server changes

 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
      this(connectString, sessionTimeout, watcher, false);
 }
Copy the code
  • ConnectString: Indicates the address of the ZK server
  • SessionTimeout: indicates the connection timeout period
  • Watcher: Monitors events

This Watcher will remain in the client ZKWatchManager defaultWatcher as the context for the entire ZooKeeper session.

protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig; 
protected final HostProvider hostProvider;

protected final ClientCnxn cnxn;

public ZooKeeper(
        String connectString,
        int sessionTimeout,
        Watcher watcher,
        boolean canBeReadOnly,
        HostProvider aHostProvider,
        ZKClientConfig clientConfig) throws IOException {...if (clientConfig == null) {
            // 1. Provide the default configuration
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
    	
    	// 2. Create a ZKWatchManager object based on ZKClientConfig
        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;
    	
        // 3. Create a ConnectStringParser object
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        hostProvider = aHostProvider;
		
        // 4. Establish the connection
        cnxn = createConnection(
            connectStringParser.getChrootPath(),
            hostProvider,
            sessionTimeout,
            this,
            watchManager,
            getClientCnxnSocket(),
            canBeReadOnly);
    	
        cnxn.start();
    }
Copy the code
buildZKClientConfig
public ZKClientConfig(a) {
    super(a); initFromJavaSystemProperties(); }private final Map<String, String> properties = new HashMap<String, String>();

public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal";

// ZKConfig
protected void handleBackwardCompatibility(a) {
        properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER));
        properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND));
        properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE));

        try (ClientX509Util clientX509Util = new ClientX509Util()) {
            putSSLProperties(clientX509Util);
            properties.put(clientX509Util.getSslAuthProviderProperty(), System.getProperty(clientX509Util.getSslAuthProviderProperty()));
        }

        try (X509Util x509Util = newQuorumX509Util()) { putSSLProperties(x509Util); }}/** * Initialize all the ZooKeeper client properties which are configurable as * java system property */
private void initFromJavaSystemProperties(a) {
      setProperty(ZOOKEEPER_REQUEST_TIMEOUT, System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
      setProperty(ZOOKEEPER_SERVER_PRINCIPAL,System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL));
}
public void setProperty(String key, String value) {
       if (null == key) {
            throw new IllegalArgumentException("property key is null.");
       }
       String oldValue = properties.put(key, value);
       if (null! = oldValue && ! oldValue.equals(value)) { LOG.debug("key {}'s value {} is replaced with new value {}", key, oldValue, value); }}Copy the code

Build the defaultZKClientConfig

  • ZKConfig#handleBackwardCompatibility

    • private final Map<String, String> properties = new HashMap<String, String>();

      public static final String JUTE_MAXBUFFER = "jute.maxbuffer";
      public static final String KINIT_COMMAND = "zookeeper.kinit";
      public static final String JGSS_NATIVE = "sun.security.jgss.native"; . properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER)); properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND)); properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE)); .Copy the code

      Set properties

  • ZKClientConfig#initFromJavaSystemProperties

    public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
    public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal";
    setProperty(ZOOKEEPER_REQUEST_TIMEOUT, System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
    setProperty(ZOOKEEPER_SERVER_PRINCIPAL, System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL));
    Copy the code
createZKWatchManager
 watchManager = defaultWatchManager();
 / / set defaultWatcher
 watchManager.defaultWatcher = watcher;
Copy the code
static class ZKWatchManager implements ClientWatchManager {
       private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
    
        private boolean disableAutoWatchReset;

        ZKWatchManager(boolean disableAutoWatchReset) {
            this.disableAutoWatchReset = disableAutoWatchReset;
        }

        protected volatileWatcher defaultWatcher; . }Copy the code
ConnectStringParser

Resolve the host and port by splitting the client connectString and support IPv6 literals

private static final int DEFAULT_PORT = 2181;

private final String chrootPath;

private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();


public ConnectStringParser(String connectString) {
        // parse out chroot, if any
        int off = connectString.indexOf('/');
        if (off >= 0) {
            String chrootPath = connectString.substring(off);
            // ignore "/" chroot spec, same as null
            if (chrootPath.length() == 1) {
                this.chrootPath = null;
            } else {
                PathUtils.validatePath(chrootPath);
                this.chrootPath = chrootPath;
            }
            connectString = connectString.substring(0, off);
        } else {
            this.chrootPath = null;
        }

        List<String> hostsList = split(connectString, ",");
        for (String host : hostsList) {
            int port = DEFAULT_PORT;
            try {
                String[] hostAndPort = ConfigUtils.getHostAndPort(host);
                host = hostAndPort[0];
                if (hostAndPort.length == 2) {
                    port = Integer.parseInt(hostAndPort[1]); }}catch(ConfigException e) { e.printStackTrace(); } serverAddresses.add(InetSocketAddress.createUnresolved(host, port)); }}Copy the code
ClientCnxn

This class manages socket I/O for clients. ClientCnxn maintains a list of available servers to which it can connect and “transparently” switches servers to which it connects as needed. In plain English, this class manages the connection between the client and the ZK server.

 public ClientCnxn(
        String chrootPath,
        HostProvider hostProvider,
        int sessionTimeout,
        ZooKeeper zooKeeper,
        ClientWatchManager watcher,
        ClientCnxnSocket clientCnxnSocket,
        long sessionId,
        byte[] sessionPasswd,
        boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;

        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();
        this.clientConfig = zooKeeper.getClientConfig();
        initRequestTimeout();
    }
Copy the code

Initialize some properties. SendThread /eventThread is more important

Set two threads as daemon threads

 SendThread(ClientCnxnSocket clientCnxnSocket) {
          super(makeThreadName("-SendThread()"));
          state = States.CONNECTING;
          this.clientCnxnSocket = clientCnxnSocket;
          setDaemon(true);
  }
  EventThread() {
          super(makeThreadName("-EventThread"));
          setDaemon(true);
 }
Copy the code
final SendThread sendThread;

final EventThread eventThread; 

public void start(a) {
    sendThread.start();
    eventThread.start();
 }
Copy the code

Start the send thread and the event thread.

We see that the ZooKeeper class establishes a connection with the ZK server by actually preparing parameters and then starting two threads, so the connection is established asynchronously.

API way

In addition, the ZooKeeper client can register Watcher with the ZooKeeper server through the getData, EXISTS, and getChildren interfaces, making it easy to add Watch events in different situations:

  /**
     * The asynchronous version of getData.
     *
     * @see #getData(String, boolean, Stat)
     */
   public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
       getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
   }

    public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
        return exists(path, watch ? watchManager.defaultWatcher : null);
    }

    public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException {
        return getChildren(path, watch ? watchManager.defaultWatcher : null);
    } 
Copy the code

States and Events

interface Event {

        /** * Enumeration of states the ZooKeeper may be at the event */
        @InterfaceAudience.Public
        enum KeeperState {
          
            @Deprecated
            Unknown(-1),

            Disconnected(0),

            @Deprecated
            NoSyncConnected(1),

            SyncConnected(3),

            AuthFailed(4),

            ConnectedReadOnly(5),

            SaslAuthenticated(6),

            Expired(-112),

            Closed(7); }... }Copy the code
  • Disconnected
    • EventType.None
  • SyncConnected
    • EventType.NodeCreated
    • EventType.NodeDeleted
    • EventType.NodeDataChanged
    • EventType.NodeChildrenChanged
  • AuthFailed
    • EventType.None
  • Expired
    • EventType.None

The event types that the client can support on the server node in different session states. For example, when a client connects to a server, it can monitor the creation and deletion of data nodes, data changes, and update of child nodes.

The above is a description of ZK’s Watch mechanism at the application layer (API). Next, let’s explore the underlying principle of Watcher mechanism

The underlying mechanism of Watch

The Watch mechanism looks a lot like the “observer mode” of design pattern: in this mode, a target object manages all dependent observer objects and actively notifyitself when its own state changes. This is usually done by calling the methods provided by the various observers. This mode is usually used in real-time event processing systems.

We can think of the Watch mechanism as the “observer mode” in a distributed scenario.

Often when we implement the observer pattern, the core or key piece of code is to create a list of observers. ZooKeeper implements two observer lists on the client side and server side respectively, namely ZKWatchManager and WatchManager.

Client Watch registration implementation process

Let’s use the getData method as an example to understand how Watch registration works on the client

ZooKeeper#getData(String path, Watcher watcher, Stat stat)

When sending a request with aWatch event, the client first marks the session as an event request with Watch monitoring, and then saves the mapping between the watcher event and the node using the DataWatchRegistration class

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if(watcher ! =null) {
            // Create DataWatchRegistration object
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = newGetDataRequest(); request.setPath(serverPath); request.setWatch(watcher ! =null);
    
        GetDataResponse response = new GetDataResponse();
        // Send the request
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if(r.getErr() ! =0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
        }
        if(stat ! =null) {
            DataTree.copyStat(response.getStat(), stat);
        }
    	
        return response.getData();
    }
Copy the code

When a client sends a request to a server, it encapsulates the request as a Packet object and adds it to an outgoingQueue

public ReplyHeader submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException {
    
        ReplyHeader r = new ReplyHeader();
    	
        
        Packet packet = queuePacket(
            h,
            r,
            request,
            response,
            null.null.null.null,
            watchRegistration,
            watchDeregistration);
    
        synchronized (packet) {
            if (requestTimeout > 0) {
                // Wait for request completion with timeout
                waitForPacketFinish(r, packet);
            } else {
                // Wait for request completion infinitely
                while(! packet.finished) { packet.wait(); }}}if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
            sendThread.cleanAndNotifyState();
        }
        return r;
    }
Copy the code
ClientCnxn#queuePacket(...)
public Packet queuePacket(...).{
    Packet packet = null;
    packet = newPacket(h, r, request, response, watchRegistration); . packet.watchDeregistration = watchDeregistration; . outgoingQueue.add(packet); . sendThread.getClientCnxnSocket().packetAdded(); }Copy the code

Finally, the ZooKeeper client sends the request (Java NIO/Netty) to the server, and when the request is sent, the ZooKeeper client sends the request (Java NIO/Netty). The readResponse method in the SendThread thread class, which handles the server response, is called to receive the callback from the server, and the finishPacket() method is executed at the end to register the Watch with ZKWatchManager.

ZooKeeper client uses Java native NIO and Netty to send requests (the details of sending requests are not described, readers can refer to the code).

SendThread#readResponse
// org.apache.zookeeper.ClientCnxn.SendThread#readResponse
 void readResponse(ByteBuffer incomingBuffer) throws IOException {... finishPacket(packet); }Copy the code
ClientCnxn#finishPacket
 protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if(p.watchRegistration ! =null) { p.watchRegistration.register(err); }... }Copy the code
ZooKeeper.WatchRegistration#register
public void register(int rc) {
    
    if (shouldAddWatch(rc)) {
        // Get the Watcher Map
        Map<String, Set<Watcher>> watches = getWatches(rc);
        synchronized (watches) {
            Set<Watcher> watchers = watches.get(clientPath);
            if (watchers == null) {
                watchers = new HashSet<Watcher>();
                watches.put(clientPath, watchers);
            }
            / / register Watchwatchers.add(watcher); }}}Copy the code

DataWatchRegistration#getWatches

class DataWatchRegistration extends WatchRegistration {

        public DataWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            returnwatchManager.dataWatches; }}Copy the code

WatchManager comes from the Zookeeper class

protected final ZKWatchManager watchManager;
Copy the code

Let’s look at the Map property of the ZKWatchManager class:

 static class ZKWatchManager implements ClientWatchManager {
		
        // The obtained Map
        private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
        private booleandisableAutoWatchReset; . }Copy the code

These maps respectively contain different categories of Watch information: dataWatches, existWatches, childWatches and so on.

conclusion
  1. When a client sends a request, it first encapsulates the sent information as a Packet object

  2. Java NIO/Netty will be used to send the Watch. If the Watch is registered, Watch information will be sent to the server

  3. After sending the Watch, the client obtains the corresponding Watch Map from ZKWatchManager through the callback method and registers the Watch in the Map

    Note that the getWatch process is unlocked, and that the synchronized keyword is used when registering the Watch to ensure atomicity. This is because only write operations can cause thread-safety problems.

Server Watch registration implementation process

  • When the ZooKeeper server receives a client request, it first parses the request to determine whether the request contains Watch events. If the request contains Watch events, it stores the Watch events in WatchManager

    At the bottom of ZooKeeper is the finalRequest Process #processRequest method

    for (Op readOp : multiReadRecord) {
                        try {
                            Record rec;
                            switch (readOp.getType()) {
                            case OpCode.getChildren:
                                rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                                subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
                                break;
                            case OpCode.getData:
                                // Process the request
                                rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                                GetDataResponse gdr = (GetDataResponse) rec;
                                subResult = new GetDataResult(gdr.getData(), gdr.getStat());
                                break;
                            default:
                                throw new IOException("Invalid type of readOp"); }}catch (KeeperException e) {
                            subResult = new ErrorResult(e.code().intValue());
                        }
                        ((MultiResponse) rsp).add(subResult);
                    }
                    break; .Copy the code
FinalRequestProcessor#handleGetDataRequest
  private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
      
        GetDataRequest getDataRequest = (GetDataRequest) request;
        / / get the path
        String path = getDataRequest.getPath();
        DataNode n = zks.getZKDatabase().getNode(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
        Stat stat = new Stat();
        
        byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
        return new GetDataResponse(b, stat);
    }
Copy the code

Getdatarequest.getwatch () returns true, indicating that the request requires Watch monitoring registration and registers Watch events with the WatchManager on the server via zks.getzkDatabase ().getData.

We can think of ServerCnxn as a connection from the client to the server, which I understand as a Watch event on the client

public abstract class ServerCnxn implements Stats, Watcher {

ZKDatabase#getData
 public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
        return dataTree.getData(path, stat, watcher);
  }
// DataTree#getData
private IWatchManager dataWatches;

public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        byte[] data = null;
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if(watcher ! =null) {
                / / register Watch
                dataWatches.addWatch(path, watcher);
            }
            data = n.data;
        }
        updateReadStat(path, data == null ? 0 : data.length);
        return data;
    }
Copy the code

Timing of the Watch event on the server

Server to perform setData method of node data after the change, will call WatchManager. TriggerWatch method trigger data changes.

public Stat setData(String path, byte data[], ...){
        Stat s = newStat(); DataNode n = nodes.get(path); . dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;
    }
Copy the code
WatchManager#triggerWatch
  • First, it encapsulates a WatchedEvent object with three properties: session state, event type, and data node.

  • Then query the Watch events registered on the node. If the value is empty, no Watch events are registered on the node. If there are Watch events, add them to the defined Watchers set and remove them from WatchManager management.

  • Finally, a notification is sent to the client by calling the process method.

public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
        
      	// Encapsulate WatchedEvent object type: event type
        / / KeeperState. SyncConnected: session state path: data node
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        Set<Watcher> watchers = new HashSet<>();
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        synchronized (this) {
            for (String localPath : pathParentIterator.asIterable()) {
                // Get the Watchers Set
                Set<Watcher> thisWatchers = watchTable.get(localPath);
                if (thisWatchers == null || thisWatchers.isEmpty()) {
                    continue;
                }
                / / traverse
                Iterator<Watcher> iterator = thisWatchers.iterator();
                while (iterator.hasNext()) {
                    Watcher watcher = iterator.next();
                    WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
                    if (watcherMode.isRecursive()) {
                        if (type != EventType.NodeChildrenChanged) {
                            watchers.add(watcher);
                        }
                    } else if(! pathParentIterator.atParentPath()) { watchers.add(watcher);if(! watcherMode.isPersistent()) { iterator.remove(); Set<String> paths = watch2Paths.get(watcher);if(paths ! =null) { paths.remove(localPath); }}}}if(thisWatchers.isEmpty()) { watchTable.remove(localPath); }}}if (watchers.isEmpty()) {
            ...
            return null;
        }
		// Iterate over the added Watcher
        for (Watcher w : watchers) {
            if(supress ! =null && supress.contains(w)) {
                continue;
            }
            // Send a notification to the clientw.process(e); }...return new WatcherOrBitSet(watchers);
    }
Copy the code

Processing of client callbacks

The client uses the sendThread.readResponse () method to process the server response uniformly.

  • First, the deserialization server sends the request header replyHdr.deserialize(bbia, “header”) and determines that the value of xID is -1, indicating that the request response is notification type. When a notification type is processed, the byte stream that has been received is first sentdeserializationConverted toWatcherEventObject.
  • Then check whether the chrootPath attribute is configured on the clienttrueNote The chrootPath attribute is configured on the client. You need to run the chrootPath command on the received node path.
  • The last calleventThread.queueEvent()Method delivers the received event toEventThreadThreads do the processing.
if (replyHdr.getXid() == -1) {... WatcherEvent event =new WatcherEvent();
    // The deserialization server sends the request header
    event.deserialize(bbia, "response"); .if(chrootPath ! =null) {
        String serverPath = event.getPath();
        if(serverPath.compareTo(chrootPath)==0)
            event.setPath("/"); . event.setPath(serverPath.substring(chrootPath.length())); . } WatchedEvent we =newWatchedEvent(event); . eventThread.queueEvent( we ); }Copy the code

We just need to focus on the eventThread.queueEvent() method.

EventThread.queueEvent
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
    
            if (event.getType() == EventType.None && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();
    		
            // Get the registered Watcher events
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // materialize the watchers based on the event
                watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
            } else {
                watchers = new HashSet<Watcher>();
                watchers.addAll(materializedWatchers);
            }
    
    
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // queue the pair (watch set & event) for later processing
    		// Add Watcher events to waitingEvents
            waitingEvents.add(pair);
}
Copy the code
ZooKeeper.ZKWatchManager#materialize
public Set<Watcher> materialize(...).
{
	Set<Watcher> result = newHashSet<Watcher>(); .// type: indicates the type of the event
	switch (type) {
    ...
	case NodeDataChanged:
	case NodeCreated:
	    synchronized (dataWatches) {
	        addTo(dataWatches.remove(clientPath), result);
	    }
	    synchronized (existWatches) {
	        addTo(existWatches.remove(clientPath), result);
	    }
	    break; . }return result;
}
Copy the code

Step 1 Query the information about the registered client Watch from ZKWatchManager according to the event type. After the client queries the Watch information, it will delete it from ZKWatchManager.

The Watcher mechanism on the client is one-time and will be deleted after triggering.

class EventThread extends ZooKeeperThread {

        private final LinkedBlockingQueue<Object> waitingEvents = newLinkedBlockingQueue<Object>(); .Copy the code

WaitingEvents is a blocking queue in EventThread. Querying the registered client Watch information from ZKWatchManager is then added to waitingEvents.

The run method in the EventThread class loops to fetch the Watcher events waiting in the waitingEvents queue for processing.

This can be interpreted as a loop in which the handler of the Watcher event is constantly fetched to be handled

public void run(a) {
	try {
	  isRunning = true;
       
      // Infinite loop
	  while (true) {
         // Retrieve the event
	     Object event = waitingEvents.take();
	     if (event == eventOfDeath) {
	        wasKilled = true;
	     } else {
            // Handle the event
	        processEvent(event);
	     }
	     if (wasKilled)
	        synchronized (waitingEvents) {
	           if (waitingEvents.isEmpty()) {
	              isRunning = false;
	              break; }}}... }Copy the code

**processEvent(event) **

private void processEvent(Object event) {...if (event instanceof WatcherSetEventPair) {
      
      WatcherSetEventPair pair = (WatcherSetEventPair) event;
      for (Watcher watcher : pair.watchers) {
          try {
              // Callback event listener handling
              watcher.process(pair.event);
          } catch (Throwable t) {
              LOG.error("Error while calling watcher ", t); }}}}Copy the code

conclusion

ZooKeeper implements the Watch mechanism by creating a list of observers on the client and server.

  • When the client calls getData, Exist and other interfaces, it first places the corresponding Watch event in the localZKWatchManagerTo manage.
  • After receiving the request from the client, the server determines whether the request containsWatchEvent, and put the corresponding event inWatchManagerTo manage.

When the event is triggered, the server queries the corresponding Watch event through the node path information and informs the client. After receiving the notification, the client first queries the local ZKWatchManager to obtain the corresponding Watch information and processes the callback operation.

This design not only realizes the observer mode in a distributed environment, but also saves the additional information required by the client and server to process Watch events at both ends, reducing the content of communication and greatly improving the processing performance of the service.

Application of Watch mechanism

Configuration center

We can store information such as database configuration items in the ZooKeeper data node. Server cluster The client adds Watch event monitoring to this node. When services in the cluster are started, the client reads data from this node to obtain data configuration information. When the data of the node changes, the ZooKeeper server will send the Watch event to each client (push). After receiving the notification, the client in the cluster will read the database configuration information of the node again (pull).

ZooKeeper implements a push-pull mechanism

The registry

ZooKeeper is often used in Dubbo projects as a registry (CP), which works like a configuration center, in contrast to Eureka’s AP design.