Subscribe and publish
Subscription and publication are one of the core functions of the registry. When an existing service provider goes offline, or a new provider joins. Consumers and service governance centers that subscribe to the interfaces receive notifications from the registry and update local configuration information. The whole process is completed automatically without human involvement.
(1) Implementation of Zookeeper
1. Implementation of publishing
-
Both providers and consumers need to register themselves in the registry.
-
Providers are registered to make consumers aware of the existence of the service to make remote calls. Also let the service governance center perceive new services to join.
-
Consumer publishing for discovery by the service governance center.
-
Release code:
zkClinet.create(toUrlPath(url))
url.getParamter(Contants.DYNAMIC_KEY,true)
Copy the code
- Unpublish code
zkClinet.delete(toUrlPath(url))
Copy the code
2. Subscription implementation
-
Subscriptions are usually in pull and push modes, where the client periodically polls the registry to pull the configuration, and the registry actively pushes the configuration to the client. Dubbo uses the first start pull and the subsequent receive events to pull data again.
-
When a service is exposed, the server subscribes to Configurators to listen for dynamic configuration. When the consumer starts up, the consumer will subscribe to the providers,routers, and Configurators directories, which correspond to service providers, routes, and dynamic configuration change notifications respectively.
-
Dubbo defines two types of connecting to zooKeeper clients: Apache Curator and zkClient. The default is a Curator.
-
Zookeeper uses the time notification and client pull mode.
- (1) When the client connects to the registry for the first time, it obtains the full amount of data in the corresponding directory.
- (2) Register a watcher on the subscribed node. The client maintains a TCP long connection with the registry.
- (3) If a node changes later, the registry will actively notify the client based on watcher’s callback.
- ④ After receiving the notification, the client pulls the full data from the node. When there are too many service nodes, the network will be under great pressure.
- Zookeeper maintains the version number of a node. The version number responds to changes and records the number of changes.
Full subscription to Zookeeper
- 1. When the client connects for the first time, it uses full subscription. Updates are then made by listening for events. The Service Governance Center subscribes to the Service layer and to all nodes under this node.
- 2. The core code comes from ZookeeperRegistry. -3. Full subscription mainly supports dubbo service governance platform. When the platform is started, the full interface will be subscribed, and it will be aware of the status of each service.
protected void doSubscribe(final URL URL, final NotifyListener Listener) {try {// Full subscriptionif ("*".equals(url.getServiceInterface())) {
String root = this.toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
if (listeners == null) {
this.zkListeners.putIfAbsent(url, new ConcurrentHashMap());
listeners = (ConcurrentMap)this.zkListeners.get(url);
}
ChildListener zkListener = (ChildListener)listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
Iterator var3 = currentChilds.iterator();
while(var3.hasNext()) {
String child = (String)var3.next();
child = URL.decode(child);
if(! ZookeeperRegistry.this.anyServices.contains(child)) { ZookeeperRegistry.this.anyServices.add(child); ZookeeperRegistry.this.subscribe(url.setPath(child).addParameters(new String[]{"interface", child, "check", String.valueOf(false)}), listener); }}}}); zkListener = (ChildListener)listeners.get(listener); } this.zkClient.create(root,false);
List<String> services = this.zkClient.addChildListener(root, zkListener);
if(services ! = null && services.size() > 0) { Iterator var7 = services.iterator();while(var7.hasNext()) {
String service = (String)var7.next();
service = URL.decode(service);
this.anyServices.add(service);
this.subscribe(url.setPath(service).addParameters(new String[]{"interface", service, "check", String.valueOf(false)}), listener); }}}else{// Subscribe List< url > urls = new ArrayList(); String[] var13 = this.toCategoriesPath(url); int var14 = var13.length;for(int var15 = 0; var15 < var14; ++var15) {
String path = var13[var15];
ConcurrentMap<NotifyListener, ChildListener> listeners = (ConcurrentMap)this.zkListeners.get(url);
if (listeners == null) {
this.zkListeners.putIfAbsent(url, new ConcurrentHashMap());
listeners = (ConcurrentMap)this.zkListeners.get(url);
}
ChildListener zkListener = (ChildListener)listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, ZookeeperRegistry.this.toUrlsWithEmpty(url, parentPath, currentChilds)); }}); zkListener = (ChildListener)listeners.get(listener); } this.zkClient.create(path,false);
List<String> children = this.zkClient.addChildListener(path, zkListener);
if(children ! = null) { urls.addAll(this.toUrlsWithEmpty(url, path, children)); } } this.notify(url, listener, urls); } } catch (Throwable var11) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + this.getUrl() + ", cause: "+ var11.getMessage(), var11); }}Copy the code
Zookeeper subscription category service
Subscription logic for the average consumer.
- (1) Obtain the following categories according to the category attribute value in the URL: Providers, Routers, consumers, and Configurators
- ② Directly pull the data of the direct child node for notification
- (3) In the case of providers category data, the subscriber updates the Invoker service list managed by the local Directory
- ④ For routers, the subscriber will update the local routing rule list.
- For configurators, the subscriber updates or overwrites the local dynamic parameter list.