Attention can view more fans exclusive blog~

Registry Overview

In Dubbo microservices, the registry is one of its core components. Dubbo implements the registration and discovery of services in a distributed environment through a registry, which is the link between distributed nodes. Its main functions are as follows:

  1. Dynamic join. A service provider can expose itself dynamically to other consumers through a registry without the need for consumers to update their profiles individually.
  2. Dynamic discovery. A consumer can dynamically perceive new configurations, routing rules, and new service providers without restarting the service to take effect.
  3. Dynamic adjustment. The registry supports dynamic adjustment of parameters, and new parameters are automatically updated to all relevant service nodes.
  4. Unified configuration. This avoids the problem of inconsistent configurations for each service caused by local configurations.

The working process

  1. When the service provider starts, it writes its own metadata information to the registry and subscribes to the configuration metadata information.
  2. When the consumer starts, it writes its own metadata information to the registry, subscribes to the service provider, routes and configures the metadata information.
  3. When the Service Governance Center (Dubo-admin) is started, it subscribes to all consumer, service provider, routing, and configuration metadata information simultaneously.
  4. When a service provider leaves or joins the registry, the service provider list changes, and the change information is dynamically notified to the consumer and service governance center.
  5. When a consumer initiates a service invocation, statistics are asynchronously reported to the monitoring center (Dubbo-monitor-Simple).

The principle of overview

Overview of ZooKeeper principles

Directory structure:

+ /dubbo // root directory, default dubbo +-- service // service directory, such as: Com. Example. Dubbo. Demo. Spi. The EchoService + - will / / service provider, The interface contained below has multiple server URL metadata information (IP, port, weight, application name, etc.) +-- consumers // service consumer directory, The following interfaces contain multiple URL metadata information (such as IP addresses, ports, weights, and application names) +-- Routers // A router configuration directory. The following interfaces contain multiple URL metadata information (such as IP addresses, ports, weights, and application names) +-- Routers // A router configuration directory for consumer routing policies +-- Configurators // A dynamic configuration directory The following contains multiple URL metadata information for dynamically configuring the serverCopy the code

Directories contain information:

Directory name Example store values
/dubbo/service/providers Dubbo: / / 192.168.0.1.20880 / com. Alibaba. Demo. Service? Category = providers&key = value &…
/dubbo/service/consumers Dubbo: / / 192.168.0.1.5002 / com. Alibaba. Demo. Service? Category = consumers&key = value &…
/dubbo/service/routers Dubbo: / / 0.0.0.0 / com. Alibaba. Demo. Service? Category = routers&key = value &…
/dubbo/service/configurators Dubbo: / / 0.0.0.0 / com. Alibaba. Demo. Service? Category = configurators&key = value &…

Overview of Redis principle

Redis also adopts Dubbo abstract structure of Root, Service, Type and URL. It is stored in a Hash structure.

key field timeout
/dubbo/com.alibaba.demo.Service URL 10000

Subscribe/publish

ZooKeeper

Implementation of publication

Both service providers and consumers need to register themselves in the registry. The registration of service providers is to make consumers aware of the existence of the service so that they can make remote calls; Also let the service governance center perceive that there is a new service provider online. The consumer is published so that the service governance center can find itself.

// ZookeeperRegistry 
public class ZookeeperRegistry extends FailbackRegistry {

	// To register, the ZooKeeper client is called to create a directory in the registry
	@Override
	public void doRegister(URL url) {
	    try {
	        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
	    } catch (Throwable e) {
	        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}// To cancel publishing, call the ZooKeeper client to delete the corresponding directory in the registry
	@Override
	public void doUnregister(URL url) {
	    try {
	        zkClient.delete(toUrlPath(url));
	    } catch (Throwable e) {
	        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}}Copy the code
Subscription implementation
  1. There are usually two methods of subscription: pull and push. One is when the client periodically polls the registry to pull the configuration, and the other is when the registry actively pushes data to the client. Both methods have advantages and disadvantages. Currently, Dubbo starts pull for the first time and then receives events to pull data again.
  2. The ZooKeeper registry uses the Event notification + Client pull mode. When the client connects to the Registry for the first time, the client obtains full data in the corresponding directory. And register a watcher on the subscription of nodes, the client keep long TCP connection with the registry, the follow-up of each node has any data change over time, registry according to the correction of watcher initiative to inform the client (event notification), after the client receives the notice, will pull all the measured data of the corresponding node under down pull (the client).
  3. Each ZooKeeper node has a version number. When the data of a node changes (transaction operation), the corresponding version number of this node will change and trigger watcher event to push data to subscribers. The version number emphasizes the number of changes. Even if the value of the object does not change, only the update operation will still change the version number.

Transaction operation Any new, modified, deleted, session creation or invalid operation on the client is considered as a transaction operation and will be executed by the Leader in the ZooKeeper cluster. Even if the client is connected to a non-Leader node, the request will be forwarded to the Leader for execution to ensure the global timing of all transaction operations. Since each node has a version number, the CAS operation can be used to compare version numbers to ensure atomicity of data operations on that node.

  1. The first time a client connects to the registry, it gets the full amount of data when it subscribes, and then updates through listener events. The service governance center handles all subscriptions to the Service layer, and service is set to a special value *. In addition, the service governance center subscribes to all child nodes under this node in addition to the current node.
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // Check whether the subscription is full
        if (ANY_VALUE.equals(url.getServiceInterface())) {
        	// Get the root path
            String root = toRootPath();
            // Listeners are empty, indicating that the cache has not been hit
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
            // if zkListener is empty, create a new listener
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
            	// Iterate over all child nodes
                for (String child : currentChilds) {
                    child = URL.decode(child);
                    // If there is a child node that has not been subscribed, it is a new node
                    if(! anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), k); }}});// Create a persistent node and start subscribing to the immediate children of the persistent node
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
            	// Traverse all child nodes to subscribe
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // Adds a subscription to the current node and returns a list of all children of the node
                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener); }}}else {
        	// Partial subscription (common consumer subscription scenario)
            List<URL> urls = new ArrayList<>();
            // Get the subscription path based on the URL
            for (String path : toCategoriesPath(url)) {
            	// Listeners are empty, indicating that the cache has not been hit
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                // if zkListener is empty, create a new listener
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                zkClient.create(path, false);
                // subscribe to return the subpath under this node and cache it
                List<String> children = zkClient.addChildListener(path, zkListener);
                if(children ! =null) { urls.addAll(toUrlsWithEmpty(url, path, children)); }}// Calls NotifyListener to update the local cachenotify(url, listener, urls); }}catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}// Get the subscription type based on the URL
private String[] toCategoriesPath(URL url) {
    String[] categories;
    if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
        categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
    } else {
    	// Incomplete subscription specifies the category provides, DEFAULT_CATEGORY = PROVIDERS_CATEGORY
        categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
    }
    String[] paths = new String[categories.length];
    for (int i = 0; i < categories.length; i++) {
        paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
    }
    return paths;
}
Copy the code
  1. Attribute value category in URL
    1. Provides: The subscriber updates the list of Invoker services managed by the local Directory.
    2. Routers: Subscribers update the list of local routing rules.
    3. Configurators: Subscribers update or overwrite local dynamic parameter lists.

Redis

Publish/subscribe mechanism

Redis subscription publishing uses an expiration mechanism and a Publish /subscribe channel. The service provider publishes the service, first creating a key in Redis and then publishing a Register time message in the channel. However, after the service’s key is written to Redis, the publisher needs to periodically refresh the expiration time of the key. In the RedisRegistry constructor, an expireExecutor timed thread pool is started and the deferExpired() method is continuously called to renew the expiration time of the key. If the service provider service goes down and is not renewed, the key is removed by Redis due to timeout and the service is considered offline.

Active/passive logoff
  1. Service provider active offline: an UnRegister event message is broadcast in the channel, which the subscriber receives and pulls data from the registry to update the locally cached list of services.
  2. Service provider offline passively: When the server is down and the key is not renewed, there is no dynamic message push. When Redis is used as the registry, the service governance center is relied on. If the service governance center is scheduled periodically, the clearing logic is triggered: All keys on the Redis are obtained for traversal. If the key has timed out, the corresponding keys on the Redis are deleted. After clearing, unregister events corresponding to keys will be published in the channel, and other consumers will delete data of local corresponding servers after listening to the cancellation event, so as to ensure the final consistency of data.
// The constructor starts the scheduled thread pool to renew itself at (expiration time / 2) frequency
this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
   try {
        deferExpired(); // Extend the expiration time
    } catch (Throwable t) { // Defensive fault tolerance
        logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
    }
    // Expiration time / 2
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);

/ / renewal
private void deferExpired(a) {
    for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
        Pool<Jedis> jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                for (URL url : new HashSet<>(getRegistered())) {
                    if (url.getParameter(DYNAMIC_KEY, true)) {
                        String key = toCategoryPath(url);
                        // Keep renewing yourself
                        if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { jedis.publish(key, REGISTER); }}}// If it is a service governance center, only the clean operation is required
                if (admin) {
                    clean(jedis);
                }
                // Non-replicate just needs to write one node
                if(! replicate) {break; }}}catch (Throwable t) {
            logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: "+ t.getMessage(), t); }}}Copy the code
Implementation of publication
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
	// Get the connection pool
    Pool<Jedis> jedisPool = entry.getValue();
    try {
        try (Jedis jedis = jedisPool.getResource()) {
        	// Set the key and expiration time
            jedis.hset(key, value, expire);
            // Publish the registration message
            jedis.publish(key, REGISTER);
            success = true;
            // Non-replicate just needs to write one node
            if(! replicate) {break; }}}catch (Throwable t) {
        exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: "+ t.getMessage(), t); }}Copy the code
Subscription implementation

In the case of a first subscription, a Notifier inner class is created, which is a threading class that asynchronously subscribes to channels when started. When the Notifier thread is started, the main thread continues to pull all service information from the registry at once. Subsequent information changes in the registry are implemented through channel push times subscribed to by the Notifier thread.

if (service.endsWith(ANY_VALUE)) {
	// Service governance center, subscribe to all services
    if (first) {
        first = false;
        Set<String> keys = jedis.keys(service);
        if (CollectionUtils.isNotEmpty(keys)) {
            for (String s : keys) {
            	// Set the local cache for the first time
                doNotify(jedis, s);
            }
        }
        resetSkip();
    }
    // Subscription services
    jedis.psubscribe(new NotifySub(jedisPool), service);
} else {
    if (first) {
        first = false;
        // Set the local cache for the first time
        doNotify(jedis, service);
        resetSkip();
    }
    // Subscription services
    jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE);
}
Copy the code

Caching mechanisms

Consumers or service governance center will do local cache after obtaining registration information. There will be a copy in memory, stored in the Properties object, and a persistent file on disk, referenced by the File object.

class AbstractRegistry// Local disk cache, where special keysvalue.registriesRecord a list of registry centers, and the others are lists of notified service providersprivate final Properties properties = new Properties();
// File cache asynchronously timed writes
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1.new NamedThreadFactory("DubboSaveRegistryCache".true));
// Whether to save files synchronously
private boolean syncSaveFile;
// Local cache version changes
private final AtomicLong lastCacheChanged = new AtomicLong();
// Service cache object in memory, similar to Redis, (key: URL field: category value: service list)
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
// Local disk cache file
private File file;
Copy the code

Cache loading

Upon service initialization, the AbstractRegistry constructor loads the persistent registry data from the local disk file into the Properties object and into the in-memory cache. Properties holds the urls of all service providers, using URL#serviceKey() as the key, provider list, routing rule list, configuration rule list, and so on as value. Because values are lists, use Spaces to separate multiple values. There is also a special key.registies that holds the addresses of all registries, and if the registry is disconnected or down during startup, the Dubbo framework automatically loads Invokers from the local cache.

// Cache file naming rules:
String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":"."-") + ".cache";

/ / sample
dubbo-registry-demo-provider-127.0. 01.-2181.cache

// Sample file contents
#Dubbo Registry Cache
#${time}
com.example.dubbo.demo.service.EchoService=empty\:/ / 192.168.0.113 \ : 20880 / com. Example. Dubbo. Demo. Service. The EchoService? Anyhost \ = true&application \ = demo - provider&bind IP \ = 192. 168.0.113 & bind. The port \ = 20880 & category \ = configurators&check \ = false&deprecated \ = false&dubbo \ = 2.0.2 & dynamic \ = true&generic \ = fa Lse&interface \ = com example. Dubbo. Demo. Service. EchoService&metadata -type \ = remote&methods \ = hello&pid \ = 10192 & release \ = 2.7.6 &side\=provider&timestamp\=${timestamp}
Copy the code

Save and update the cache

The cache can be saved synchronously or asynchronously. Asynchron uses the thread pool to save asynchronously (registryCacheExecutor). If an exception occurs during execution, the thread pool is called again to retry. Abstract: AbstractregistryNotify encapsulates the memory cache and file cache update logic. This method is invoked when a client first subscribes to a full set of data, or when it subsequently subscribes to a new set of data.

// Get the last changed version
long version = lastCacheChanged.incrementAndGet();
if (syncSaveFile) {
	// Synchronize the save
    doSaveProperties(version);
} else {
	// Save asynchronously
    registryCacheExecutor.execute(new SaveProperties(version));
}
Copy the code

Retry mechanism

Both ZooKeeperRegistry and RedisRegistry inherit FailbackRegistry, which inherit AbstractRegistry. FailbackRegistry adds the failure retry mechanism as an abstract capability based on AbstractRegistry that subclasses can use directly. One is defined in the FailbackRegistry abstract classScheduledThreadPoolExecutorThe FailbackRegistry#retry() method is called every fixed interval (5s by default) to retry the failed collection and remove it from the queue if it succeeds. FailbackRegistry implements subscribe, unsubscribe, and other generic methods that call unimplemented template methods, which are implemented by subclasses. The generic method calls these template methods and, if an exception is caught, adds the URL to the corresponding retry collection for the timer to retry.

// Set of urls that failed to initiate registration
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();

// Unregister a collection of failed urls
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();

// Set of urls that failed to initiate a subscription
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();

// Set of urls that failed to unsubscribe
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

// Set of failed URL notifications
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
Copy the code

Related articles:

Davids principle exploration: Dubbo source code (2.7.8)

Davids principle exploration: Dubbo SPI and Java SPI implementation principle

Davids principle exploration: Dubbo registry (ZooKeeper, Redis) implementation principle

Davids principle exploration: Dubbo configuration parsing principle

Davids principle exploration: Dubbo service exposure principle

Davids principle exploration: Dubbo service consumption principle

Davids principle exploration: Dubbo elegant downtime principle analysis

Davids principle exploration: Dubbo call flow chart

Davids principle exploration: Dubbo routing implementation principle

Davids principle exploration: Dubbo load balancing principle

Davids principle exploration: Dubbo filter principle