This article will analyze the underlying principles of Zookeeper’s Watch mechanism from the source level, and explain in detail as much as possible. If there are mistakes or unclear places, welcome to correct, thank you!
In this article, ZK refers only to Zookeeper.
We know that they are temporary node can be used to implement distributed lock, multiple clients to create a node respectively, creating success is to get the lock, and create a failure of the client will monitor this temporary node, acquiring a lock of the client releases the lock (delete temporary node) or disconnected with ZK server after (ZK will remove the temporary node), The other clients will get a notification from Watch saying, hey guys, it’s released the lock, come grab the lock.
Above is an original distributed lock (not in production, the stampede effect), where we see the Watch mechanism being used to notify the client.
Stampede effect: When many processes are waiting for an event, they are awakened, but only one process can gain CPU execution rights, and the others are blocked, resulting in severe context switch costs. (From Wikipedia)
In some distributed scenario, we only need a machine to participate in normal distributed work (is actually to ZK server, create a temporary node this process similar to the simple version of a distributed lock described above, a machine successfully creates a temporary node was selected as the working machine, create the fall was listening the other node), after the abnormal machine downtime, Zookeeper uses the Watch mechanism to notify other machines to continue creating temporary nodes. If only one machine succeeds, the other machines will listen for the node. The machine that created the node will replace the machine that went down.
This is an example of using ZK to solve a single point problem in a distributed scenario.
From the previous description, it is not difficult to see that the Watch mechanism of ZK is a core and widely used function, so let’s analyze how the Watch mechanism is implemented inside ZK.
How is the Watch mechanism implemented?
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.1 track</version>
</dependency>
Copy the code
First you need to introduce POM dependencies (ZK’s native client API)
How do I use the Watch mechanism
ZooKeeper
Class to establish a connection
This section will describe the process of establishing a connection between ZooKeeper class and the ZK server, which involves the initialization of the Watch object. The specific underlying principles will be explained in the following chapters.
ZooKeeper clients can use the Watch mechanism to subscribe to receive notifications when the data or status of a node on the server changes
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
this(connectString, sessionTimeout, watcher, false);
}
Copy the code
- ConnectString: Indicates the address of the ZK server
- SessionTimeout: indicates the connection timeout period
- Watcher: Monitors events
This Watcher will remain in the client ZKWatchManager defaultWatcher as the context for the entire ZooKeeper session.
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
protected final HostProvider hostProvider;
protected final ClientCnxn cnxn;
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {...if (clientConfig == null) {
// 1. Provide the default configuration
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
// 2. Create a ZKWatchManager object based on ZKClientConfig
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
// 3. Create a ConnectStringParser object
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
// 4. Establish the connection
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
}
Copy the code
buildZKClientConfig
public ZKClientConfig(a) {
super(a); initFromJavaSystemProperties(); }private final Map<String, String> properties = new HashMap<String, String>();
public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal";
// ZKConfig
protected void handleBackwardCompatibility(a) {
properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER));
properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND));
properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE));
try (ClientX509Util clientX509Util = new ClientX509Util()) {
putSSLProperties(clientX509Util);
properties.put(clientX509Util.getSslAuthProviderProperty(), System.getProperty(clientX509Util.getSslAuthProviderProperty()));
}
try (X509Util x509Util = newQuorumX509Util()) { putSSLProperties(x509Util); }}/** * Initialize all the ZooKeeper client properties which are configurable as * java system property */
private void initFromJavaSystemProperties(a) {
setProperty(ZOOKEEPER_REQUEST_TIMEOUT, System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
setProperty(ZOOKEEPER_SERVER_PRINCIPAL,System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL));
}
public void setProperty(String key, String value) {
if (null == key) {
throw new IllegalArgumentException("property key is null.");
}
String oldValue = properties.put(key, value);
if (null! = oldValue && ! oldValue.equals(value)) { LOG.debug("key {}'s value {} is replaced with new value {}", key, oldValue, value); }}Copy the code
Build the defaultZKClientConfig
-
ZKConfig#handleBackwardCompatibility
-
private final Map<String, String> properties = new HashMap<String, String>();
public static final String JUTE_MAXBUFFER = "jute.maxbuffer"; public static final String KINIT_COMMAND = "zookeeper.kinit"; public static final String JGSS_NATIVE = "sun.security.jgss.native"; . properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER)); properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND)); properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE)); .Copy the code
Set properties
-
-
ZKClientConfig#initFromJavaSystemProperties
public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout"; public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal"; setProperty(ZOOKEEPER_REQUEST_TIMEOUT, System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT)); setProperty(ZOOKEEPER_SERVER_PRINCIPAL, System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL)); Copy the code
createZKWatchManager
watchManager = defaultWatchManager();
/ / set defaultWatcher
watchManager.defaultWatcher = watcher;
Copy the code
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
private boolean disableAutoWatchReset;
ZKWatchManager(boolean disableAutoWatchReset) {
this.disableAutoWatchReset = disableAutoWatchReset;
}
protected volatileWatcher defaultWatcher; . }Copy the code
ConnectStringParser
Resolve the host and port by splitting the client connectString and support IPv6 literals
private static final int DEFAULT_PORT = 2181;
private final String chrootPath;
private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
public ConnectStringParser(String connectString) {
// parse out chroot, if any
int off = connectString.indexOf('/');
if (off >= 0) {
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
try {
String[] hostAndPort = ConfigUtils.getHostAndPort(host);
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]); }}catch(ConfigException e) { e.printStackTrace(); } serverAddresses.add(InetSocketAddress.createUnresolved(host, port)); }}Copy the code
ClientCnxn
This class manages socket I/O for clients. ClientCnxn maintains a list of available servers to which it can connect and “transparently” switches servers to which it connects as needed. In plain English, this class manages the connection between the client and the ZK server.
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
Copy the code
Initialize some properties. SendThread /eventThread is more important
Set two threads as daemon threads
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true);
}
Copy the code
final SendThread sendThread;
final EventThread eventThread;
public void start(a) {
sendThread.start();
eventThread.start();
}
Copy the code
Start the send thread and the event thread.
We see that the ZooKeeper class establishes a connection with the ZK server by actually preparing parameters and then starting two threads, so the connection is established asynchronously.
API way
In addition, the ZooKeeper client can register Watcher with the ZooKeeper server through the getData, EXISTS, and getChildren interfaces, making it easy to add Watch events in different situations:
/**
* The asynchronous version of getData.
*
* @see #getData(String, boolean, Stat)
*/
public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
}
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
return exists(path, watch ? watchManager.defaultWatcher : null);
}
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException {
return getChildren(path, watch ? watchManager.defaultWatcher : null);
}
Copy the code
States and Events
interface Event {
/** * Enumeration of states the ZooKeeper may be at the event */
@InterfaceAudience.Public
enum KeeperState {
@Deprecated
Unknown(-1),
Disconnected(0),
@Deprecated
NoSyncConnected(1),
SyncConnected(3),
AuthFailed(4),
ConnectedReadOnly(5),
SaslAuthenticated(6),
Expired(-112),
Closed(7); }... }Copy the code
Disconnected
EventType.None
SyncConnected
EventType.NodeCreated
EventType.NodeDeleted
EventType.NodeDataChanged
EventType.NodeChildrenChanged
AuthFailed
EventType.None
Expired
EventType.None
The event types that the client can support on the server node in different session states. For example, when a client connects to a server, it can monitor the creation and deletion of data nodes, data changes, and update of child nodes.
The above is a description of ZK’s Watch mechanism at the application layer (API). Next, let’s explore the underlying principle of Watcher mechanism
The underlying mechanism of Watch
The Watch mechanism looks a lot like the “observer mode” of design pattern: in this mode, a target object manages all dependent observer objects and actively notifyitself when its own state changes. This is usually done by calling the methods provided by the various observers. This mode is usually used in real-time event processing systems.
We can think of the Watch mechanism as the “observer mode” in a distributed scenario.
Often when we implement the observer pattern, the core or key piece of code is to create a list of observers. ZooKeeper implements two observer lists on the client side and server side respectively, namely ZKWatchManager and WatchManager.
Client Watch registration implementation process
Let’s use the getData method as an example to understand how Watch registration works on the client
ZooKeeper#getData(String path, Watcher watcher, Stat stat)
When sending a request with aWatch event, the client first marks the session as an event request with Watch monitoring, and then saves the mapping between the watcher event and the node using the DataWatchRegistration class
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if(watcher ! =null) {
// Create DataWatchRegistration object
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = newGetDataRequest(); request.setPath(serverPath); request.setWatch(watcher ! =null);
GetDataResponse response = new GetDataResponse();
// Send the request
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if(r.getErr() ! =0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
if(stat ! =null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
Copy the code
When a client sends a request to a server, it encapsulates the request as a Packet object and adds it to an outgoingQueue
public ReplyHeader submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(
h,
r,
request,
response,
null.null.null.null,
watchRegistration,
watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while(! packet.finished) { packet.wait(); }}}if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
Copy the code
ClientCnxn#queuePacket(...)
public Packet queuePacket(...).{
Packet packet = null;
packet = newPacket(h, r, request, response, watchRegistration); . packet.watchDeregistration = watchDeregistration; . outgoingQueue.add(packet); . sendThread.getClientCnxnSocket().packetAdded(); }Copy the code
Finally, the ZooKeeper client sends the request (Java NIO/Netty) to the server, and when the request is sent, the ZooKeeper client sends the request (Java NIO/Netty). The readResponse method in the SendThread thread class, which handles the server response, is called to receive the callback from the server, and the finishPacket() method is executed at the end to register the Watch with ZKWatchManager.
ZooKeeper client uses Java native NIO and Netty to send requests (the details of sending requests are not described, readers can refer to the code).
SendThread#readResponse
// org.apache.zookeeper.ClientCnxn.SendThread#readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {... finishPacket(packet); }Copy the code
ClientCnxn#finishPacket
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if(p.watchRegistration ! =null) { p.watchRegistration.register(err); }... }Copy the code
ZooKeeper.WatchRegistration#register
public void register(int rc) {
if (shouldAddWatch(rc)) {
// Get the Watcher Map
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized (watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
/ / register Watchwatchers.add(watcher); }}}Copy the code
DataWatchRegistration#getWatches
class DataWatchRegistration extends WatchRegistration {
public DataWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
returnwatchManager.dataWatches; }}Copy the code
WatchManager comes from the Zookeeper class
protected final ZKWatchManager watchManager;
Copy the code
Let’s look at the Map property of the ZKWatchManager class:
static class ZKWatchManager implements ClientWatchManager {
// The obtained Map
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
private booleandisableAutoWatchReset; . }Copy the code
These maps respectively contain different categories of Watch information: dataWatches, existWatches, childWatches and so on.
conclusion
-
When a client sends a request, it first encapsulates the sent information as a Packet object
-
Java NIO/Netty will be used to send the Watch. If the Watch is registered, Watch information will be sent to the server
-
After sending the Watch, the client obtains the corresponding Watch Map from ZKWatchManager through the callback method and registers the Watch in the Map
Note that the getWatch process is unlocked, and that the synchronized keyword is used when registering the Watch to ensure atomicity. This is because only write operations can cause thread-safety problems.
Server Watch registration implementation process
-
When the ZooKeeper server receives a client request, it first parses the request to determine whether the request contains Watch events. If the request contains Watch events, it stores the Watch events in WatchManager
At the bottom of ZooKeeper is the finalRequest Process #processRequest method
for (Op readOp : multiReadRecord) { try { Record rec; switch (readOp.getType()) { case OpCode.getChildren: rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo); subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren()); break; case OpCode.getData: // Process the request rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo); GetDataResponse gdr = (GetDataResponse) rec; subResult = new GetDataResult(gdr.getData(), gdr.getStat()); break; default: throw new IOException("Invalid type of readOp"); }}catch (KeeperException e) { subResult = new ErrorResult(e.code().intValue()); } ((MultiResponse) rsp).add(subResult); } break; .Copy the code
FinalRequestProcessor#handleGetDataRequest
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
GetDataRequest getDataRequest = (GetDataRequest) request;
/ / get the path
String path = getDataRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
Stat stat = new Stat();
byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
return new GetDataResponse(b, stat);
}
Copy the code
Getdatarequest.getwatch () returns true, indicating that the request requires Watch monitoring registration and registers Watch events with the WatchManager on the server via zks.getzkDatabase ().getData.
We can think of ServerCnxn as a connection from the client to the server, which I understand as a Watch event on the client
public abstract class ServerCnxn implements Stats, Watcher {
ZKDatabase#getData
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
return dataTree.getData(path, stat, watcher);
}
// DataTree#getData
private IWatchManager dataWatches;
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
byte[] data = null;
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if(watcher ! =null) {
/ / register Watch
dataWatches.addWatch(path, watcher);
}
data = n.data;
}
updateReadStat(path, data == null ? 0 : data.length);
return data;
}
Copy the code
Timing of the Watch event on the server
Server to perform setData method of node data after the change, will call WatchManager. TriggerWatch method trigger data changes.
public Stat setData(String path, byte data[], ...){
Stat s = newStat(); DataNode n = nodes.get(path); . dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;
}
Copy the code
WatchManager#triggerWatch
First, it encapsulates a WatchedEvent object with three properties: session state, event type, and data node.
Then query the Watch events registered on the node. If the value is empty, no Watch events are registered on the node. If there are Watch events, add them to the defined Watchers set and remove them from WatchManager management.
Finally, a notification is sent to the client by calling the process method.
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
// Encapsulate WatchedEvent object type: event type
/ / KeeperState. SyncConnected: session state path: data node
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
for (String localPath : pathParentIterator.asIterable()) {
// Get the Watchers Set
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
continue;
}
/ / traverse
Iterator<Watcher> iterator = thisWatchers.iterator();
while (iterator.hasNext()) {
Watcher watcher = iterator.next();
WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
if (watcherMode.isRecursive()) {
if (type != EventType.NodeChildrenChanged) {
watchers.add(watcher);
}
} else if(! pathParentIterator.atParentPath()) { watchers.add(watcher);if(! watcherMode.isPersistent()) { iterator.remove(); Set<String> paths = watch2Paths.get(watcher);if(paths ! =null) { paths.remove(localPath); }}}}if(thisWatchers.isEmpty()) { watchTable.remove(localPath); }}}if (watchers.isEmpty()) {
...
return null;
}
// Iterate over the added Watcher
for (Watcher w : watchers) {
if(supress ! =null && supress.contains(w)) {
continue;
}
// Send a notification to the clientw.process(e); }...return new WatcherOrBitSet(watchers);
}
Copy the code
Processing of client callbacks
The client uses the sendThread.readResponse () method to process the server response uniformly.
- First, the deserialization server sends the request header replyHdr.deserialize(bbia, “header”) and determines that the value of xID is -1, indicating that the request response is notification type. When a notification type is processed, the byte stream that has been received is first sentdeserializationConverted to
WatcherEvent
Object. - Then check whether the chrootPath attribute is configured on the client
true
Note The chrootPath attribute is configured on the client. You need to run the chrootPath command on the received node path. - The last call
eventThread.queueEvent()
Method delivers the received event toEventThread
Threads do the processing.
if (replyHdr.getXid() == -1) {... WatcherEvent event =new WatcherEvent();
// The deserialization server sends the request header
event.deserialize(bbia, "response"); .if(chrootPath ! =null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/"); . event.setPath(serverPath.substring(chrootPath.length())); . } WatchedEvent we =newWatchedEvent(event); . eventThread.queueEvent( we ); }Copy the code
We just need to focus on the eventThread.queueEvent() method.
EventThread.queueEvent
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// Get the registered Watcher events
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
// Add Watcher events to waitingEvents
waitingEvents.add(pair);
}
Copy the code
ZooKeeper.ZKWatchManager#materialize
public Set<Watcher> materialize(...).
{
Set<Watcher> result = newHashSet<Watcher>(); .// type: indicates the type of the event
switch (type) {
...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break; . }return result;
}
Copy the code
Step 1 Query the information about the registered client Watch from ZKWatchManager according to the event type. After the client queries the Watch information, it will delete it from ZKWatchManager.
The Watcher mechanism on the client is one-time and will be deleted after triggering.
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = newLinkedBlockingQueue<Object>(); .Copy the code
WaitingEvents is a blocking queue in EventThread. Querying the registered client Watch information from ZKWatchManager is then added to waitingEvents.
The run method in the EventThread class loops to fetch the Watcher events waiting in the waitingEvents queue for processing.
This can be interpreted as a loop in which the handler of the Watcher event is constantly fetched to be handled
public void run(a) {
try {
isRunning = true;
// Infinite loop
while (true) {
// Retrieve the event
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// Handle the event
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break; }}}... }Copy the code
**processEvent(event) **
private void processEvent(Object event) {...if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
// Callback event listener handling
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t); }}}}Copy the code
conclusion
ZooKeeper implements the Watch mechanism by creating a list of observers on the client and server.
- When the client calls getData, Exist and other interfaces, it first places the corresponding Watch event in the local
ZKWatchManager
To manage.- After receiving the request from the client, the server determines whether the request contains
Watch
Event, and put the corresponding event inWatchManager
To manage.
When the event is triggered, the server queries the corresponding Watch event through the node path information and informs the client. After receiving the notification, the client first queries the local ZKWatchManager to obtain the corresponding Watch information and processes the callback operation.
This design not only realizes the observer mode in a distributed environment, but also saves the additional information required by the client and server to process Watch events at both ends, reducing the content of communication and greatly improving the processing performance of the service.
Application of Watch mechanism
Configuration center
We can store information such as database configuration items in the ZooKeeper data node. Server cluster The client adds Watch event monitoring to this node. When services in the cluster are started, the client reads data from this node to obtain data configuration information. When the data of the node changes, the ZooKeeper server will send the Watch event to each client (push). After receiving the notification, the client in the cluster will read the database configuration information of the node again (pull).
ZooKeeper implements a push-pull mechanism
The registry
ZooKeeper is often used in Dubbo projects as a registry (CP), which works like a configuration center, in contrast to Eureka’s AP design.