Service subscriptions and publishing

Subscription and publishing is one of the core functions of the entire registry. In traditional application systems, the configuration information is usually written into a configuration file. When the configuration needs to be changed, the configuration file is modified and the contents are reloaded by manually triggering the configuration in the memory, such as restarting services. This mode also facilitates O&M in small-scale cluster scenarios. However, as the number of nodes increases, the disadvantages of this management approach become apparent.

If a registry is used, the above problem is solved easily. When an existing service provider node goes offline, or when a new service provider comes online in the microservice environment, consumers and service governance center subscribes to the corresponding interface can be notified by the registry in a timely manner, and local configuration information can be updated. In this way, subsequent service invocations can avoid invoking nodes that have been taken offline and instead invoke the content of newly added nodes. The whole process is done automatically without human involvement.

Dubbo abstracts the whole process in the upper layer, and provides different implementation methods for the whole process. Here we mainly talk about Zookeeper and Redis.

Zookeeper implements service subscription and publication

1. Release implementation

Both service providers and service consumers need to register themselves in the registry. The registration of service providers is to make service consumers aware of the existence of corresponding services, so as to initiate remote calls. Also let the service governance center perceive that there is a new service provider online. The service consumer is published so that the service governance center can discover itself. The Zookeeper publishing code is very simple and simply calls the Zookeeper client library to create a directory on the registry.

zkClient.create(toUrlPath(url));
url.getParameter(Constants.DYNAMIC_KEY,true));
Copy the code

Unpublishing is also a very simple operation, just delete the corresponding path on Zookeeper.

zkClient.delete(toUrlPath(url));
Copy the code

2. Subscription implementation

Subscriptions are usually done in two ways: pull and push, where the client periodically polls the registry to pull the configuration, and where the registry actively pushes data to the client. These two methods have their own advantages and disadvantages. At present, Dubbo uses the first pull method, and then receives the event to pull the data again.

When a service is exposed, a server subscribes to Configurators to listen for dynamic configurations. When a consumer starts up, it subscribes to three directories, providers, routers, and Configurations, for notifications of service providers, routes, and dynamic configuration changes.

What Zookeeper client implementations are available in Dubbo

No matter the service provider, consumer, or service governance center, any node connecting to the Zookeeper registry needs to use a client. Dubbo implements unified encapsulation of the Zookeeper client in the Dubbo-remoting-ZooKeeper module. A unified Client API is defined and implemented using two different open source clients

  • Apache Curator
  • ZkClient

Users can set the client property of Dubbo: Registry to use different client implementation libraries for both Exhibit and ZkClient. Otherwise, exhibit and ZkClient will be used by default.

The Zookeeper registry uses the Event notification + Client Pull and Retrieve mode. When the client connects to the Zookeeper registry for the first time, it obtains full data in the corresponding directory. And register a Watcher on the subscribed node. The client maintains a TCP long connection with the registry. When any data changes occur on each node, the registry will actively notify the client based on the watcher callback (event notification). Pulls the full amount of data from the corresponding node (client-side pull), with constraints on the interface to NotifyListener#notify(List urls). There is a drawback here is that if the data volume of full pull is too large, it will cause certain pressure to the network.

Each Zookeeper node has its own version number. When node data changes, the corresponding node version number will change and trigger watcher event to push data to subscribers. The version number emphasizes the number of changes. Even if the value of the object does not change, only the update operation will still change the version number.

What operations are considered transactional operations?

Any new, deleted, modified, session creation or invalid operation on the client is considered as a transaction operation and will be executed by the Leader in Zookeeper. Even if the client is connected to a non-Leader node, the request will be forwarded to the Leader node for execution. In this way, consistency can be ensured. Since each node has its own version number, the CAS operation can be used to compare and ensure atomicity of node data.

The first time a client connects to the registry, it gets the full amount of data when it subscribes, and then updates through listener events. The service governance center handles subscription information for all service layers, and service is set to *. In addition, the service Governance Center subscribes to the current node and to all child nodes under this node, with the core code coming from ZookeeperRegistry.

 @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if(! anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), k); }}}); zkClient.create(root,false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener); }}}else {
                CountDownLatch latch = new CountDownLatch(1);
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));
                    if (zkListener instanceof RegistryChildListenerImpl) {
                        ((RegistryChildListenerImpl) zkListener).setLatch(latch);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if(children ! =null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
                // tells the listener to run only after the sync notification of main thread finishes.latch.countDown(); }}catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

As can be seen from the above code, the Dubbo service governance platform dubbo-admin needs to be supported. The platform will subscribe to the full interface when it is started, and it will be aware of the status of each service.

The subscription logic for the average consumer is as follows, starting with a set of paths to subscribe to based on the category of URL. If the category is *, the system subscribes to four types of paths (Providers, Routers, Consumers, and Configurators). Otherwise, the system subscribes to only the providers path.

                CountDownLatch latch = new CountDownLatch(1);
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));
                    if (zkListener instanceof RegistryChildListenerImpl) {
                        ((RegistryChildListenerImpl) zkListener).setLatch(latch);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if(children ! =null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
                // tells the listener to run only after the sync notification of main thread finishes.
                latch.countDown();
Copy the code

Note that the specific category is retrieved based on the value of the category attribute in the URL; Providers, routers, consumers, and Configurators, and then pull data from direct child nodes for notification. If the data is of the providers category, the subscriber will update the Invoker service list managed by the local Directory. For routers, the subscriber will update the local routing rule list. In the case of the Configuators category, the subscriber updates the local parameter list

Redis implements subscription publishing

1. Overall process

Using Redis as the registry, subscription publishing is implemented differently from Zookeeper. Redis subscription publishing uses an expiration mechanism and a Publish /subscribe channel. The service provider publishes the service. A key is created in Redis and a Register event message is published in the channel. After the service’s key is written to Redis, the publisher needs to periodically refresh the key expiration event. In the RedisRegistry constructor, an expireExecutor timed thread pool is started, and the deferExpired() method is constantly called to renew the key timeout. If the service provider is down and not renewed, the key will be deleted by Redis due to timeout, and the service will be considered offline.

    private void deferExpired(a) {
        for (URL url : new HashSet<>(getRegistered())) {
            if (url.getParameter(DYNAMIC_KEY, true)) {
                String key = toCategoryPath(url);
                if (redisClient.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { redisClient.publish(key, REGISTER); }}}if (doExpire) {
            for (Map.Entry<URL, Long> expireEntry : expireCache.entrySet()) {
                if(expireEntry.getValue() < System.currentTimeMillis()) { doNotify(toCategoryPath(expireEntry.getKey())); }}}if(admin) { clean(); }}Copy the code

When the subscriber connects to the registry for the first time, it will obtain the full amount of data and cache it in the local memory. The subsequent changes of the service list will be broadcast through the Publish/Subscribe channel. When a service provider is offline, an Unregister event message will be broadcast in the broadcast channel. Upon receipt, the subscriber pulls data from the registry and updates the list of locally cached services, which is also triggered by channel events when a new service provider comes online.

If the service provider goes down instead of going offline, there will be no unregister event messages broadcast. How will the subscriber know that the publisher of the service has gone offline? In addition, the Publish /subscribe channel of Redis is not reliable. If the Dubbo registry uses failover cluster fault-tolerant mode and the consumer subscribes to the slave node, the master node fails to complete data synchronization to the slave node. How will a subsequent subscriber know if the service publisher is offline?

If Redis is used as the service registry, it depends on the service governance center. If the service governance center is scheduled periodically, the clearing logic is triggered. Redis obtains all keys for traversal. After the clearing is complete, the unregister event corresponding to the key will be initiated in the channel. When other consumers detect the cancellation event, the corresponding local service data will be deleted to ensure data consistency.

This is how the Redis registry works.

During Redis client initialization, jedisPools of Redis need to be initialized first. At this time, if the registry mode is configured as cluster mode, the service provider needs to write data to all nodes in the Redis cluster at the same time when publishing data. However, data is read from one of the nodes. In Redis cluster mode, data synchronization is not configured and consistency is guaranteed by multiple write operations on the client.

If failover is set or not set, any Redis node will be read and written. If it fails, the next Redis node will be tried. In this mode, Redis must confidently configure data synchronization. In addition, a thread pool, expireExecutor, is initialized during initialization. Its main task is to extend the expiration time of keys and delete expired keys. The thread scheduling interval is half of the timeout time.

2. Implementation of publishing

Both service providers and consumers use the registration function, and the code for the Redis registration section is shown below

    @Override
    public void doRegister(URL url) {
        String key = toCategoryPath(url);
        String value = url.toFullString();
        String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
        try {
            redisClient.hset(key, value, expire);
            redisClient.publish(key, REGISTER);
        } catch (Throwable t) {
            throw new RpcException("Failed to register service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: "+ t.getMessage(), t); }}Copy the code

3. Subscription implementation

Service consumers, service providers, and service registries all use the subscription functionality of the registry. When subscribing, if for the first time, a Notifier inner class is created, which is a threaded class that initiates channel subscriptions asynchronously. During startup, the Notifier thread also starts, and the main thread continues to pull all information from the registry. Subsequent information changes in the registry are pushed through the channel subscribed by the Notifier thread. Here’s the code.

   if (service.endsWith(ANY_VALUE)) {
         if (first) {
             first = false;
             Set<String> keys = redisClient.scan(service);
             if (CollectionUtils.isNotEmpty(keys)) {
                 for (String s : keys) {
                     doNotify(s);
                 }
             }
             resetSkip();
         }
         redisClient.psubscribe(new NotifySub(), service);
     } else {
         if (first) {
             first = false;
             doNotify(service);
             resetSkip();
         }
         redisClient.psubscribe(new NotifySub(), service + PATH_SEPARATOR + ANY_VALUE); // blocking
                                }
Copy the code