Registry — opening chapter
Objective: To explain the role of the registry in the Dubbo framework and interpret the source code of dubbo-Registry-API
What is a registry?
Service governance framework of service communication and service management can be roughly divided into two parts, service management can be divided into service registry, service discovery and service by hot working intervention, the service Provider service Provider will into the registry, and consumers will Consumer subscription services, from the registry does not subscribe to all of the services.
The official documentation provides the dependencies between Provider, Consumer, and Registry:
From the figure above, we can clearly see the functions of Registry. For example, Registry is similar to a vending machine. The service provider is similar to a commodity producer. That is, subscribe to the corresponding service. This explanation should give you a sense of what the role of the registry is.
The interpretation of dubbo – registry – API
First let’s look at the structure under this package:
You can clearly see the four registries internally supported by Dubbo, which are Dubbo, Multicast, ZooKeeper and Redis. They all rely on classes under the support package. I’ll walk you through the design and implementation of the registry in Dubbo from top to bottom based on the dependencies shown above.
(a) RegistryService
This interface is the service interface of the registry module, providing registration, unregistration, subscription, unsubscription, and query the registered data that meets the conditions. I will not post its source code, you can check the relevant part of the official documentation, also given Chinese annotations.
RegistryService source address: dubbo.apache.org/zh-cn/docs/…
We can see from the comments that the contracts for each method are stated above. Dubbo uses bus mode to transfer and save configuration information at any time, that is, configuration information is placed on the URL for transfer, and relevant configuration information can be obtained at any time. As mentioned here, URL has other functions, that is, as a function similar to node. First, when the service Provider (Provider) starts to provide services, it will write down its URL address to the registry. The consumer then needs to subscribe to the service when it starts, subscribes to the Provider’s registered address, and writes down its OWN URL. To continue with my example above, after the commodity producer produces the commodity, it will put the commodity in a certain column of the vending machine. When consumers need to buy the commodity, they will buy it through this address and leave their purchase record. Here are the methods:
-
This method allows urls with the same URI but different parameters to coexist and cannot be overridden. In other words, url values must be unique and cannot be identical.
void register(URL url); Copy the code
-
Unregister, the method is also very simple, is unregister, that is, the commodity producer is not selling the commodity, need to take things off the vending machine, the column should also be removed, here emphasis on the full URL match to unregister.
void unregister(URL url); Copy the code
-
Subscribe. Instead of subscribing to a full URL match, subscribe to a condition, which means you can subscribe to multiple services. A listener is used to listen for events that process changes to registered data.
void subscribe(URL url, NotifyListener listener); Copy the code
-
Unsubscribe, this is based on full URL matching to unsubscribe.
void unsubscribe(URL url, NotifyListener listener); Copy the code
-
Query registration list, through the URL for conditional query matched all URL sets.
List<URL> lookup(URL url); Copy the code
(2) Registry
Registry interface, which is well understood, is the integration of nodes and registry service methods into this interface. Let’s look at the source code:
public interface Registry extends Node.RegistryService {}Copy the code
As you can see, this interface does not have its own methods, but inherits the Node and RegistryService interfaces. Here Node is the Node interface, there is agreement about the Node operation methods, we can take a look at the source code:
public interface Node {
// Get the node address
URL getUrl(a);
// Check whether the node is available
boolean isAvailable(a);
// Destroy the node
void destroy(a);
}
Copy the code
(3) RegistryFactory
This interface is the registry’s factory interface and is used to return objects from the registry. Take a look at the source code:
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
Copy the code
The original method has some English notes, write about the connection of the registry to deal with the contract, you can directly read the official document, or Chinese.
Address: dubbo.apache.org/zh-cn/docs/…
This interface is an extensible interface, and you can see that there is an @SPI annotation on this interface and the default value is dubbo, which means that the default extension is DubboRegistryFactory, and you can see the @Adaptive annotation on the getRegistry method. The interface dynamically generates an adapter, RegistryFactory$Adaptive, and extends the implementation class corresponding to the urL.protocol value in the first place. For details about SPI extension mechanism, please see dubbo source code analysis (ii) Dubbo extension mechanism SPI.
(4) NotifyListener
The interface has only one notify method, which notifies the listener. Triggered when a service change notification is received. Take a look at the source code:
public interface NotifyListener {
/** * Triggered when service change notification is received. * <p> * Notification needs to be processed contract: <br> * 1. Full notifications are always based on service interface and data type, that is, partial data of the same type of a service is not notified, and users do not need to compare the results of the last notification. <br> * 2. The first notification when subscribing must be a full notification of all types of data for a service. < BR > * 3. During changes, different types of data can be notified separately, for example, providers, consumers, routers, and overrides. Only one type of data can be notified, but the data of this type must be full, not incremental. <br> * 4. If a type of data is empty, notify an empty protocol for identifying URL data with the category parameter. < BR > * 5. The notifier (i.e. the registry implementation) needs to ensure the order of notifications, e.g. single-threaded push, queue serialization, with version comparison. <br> * *@paramA list of registered urls, always not empty, with the same meaning as {@linkCom. Alibaba. Dubbo. Registry. RegistryService# lookup (URL)} return values. * /
void notify(List<URL> urls);
}
Copy the code
(5) AbstractRegistry under the Support package
AbstractRegistry implements the Registry interface and is an abstract class of Registry. In order to reduce the pressure on the registry, this class implements the mechanism of caching the local URL in the property file, and implements the registration, subscription and other methods of the registry.
Source note address: github.com/CrazyHZM/in…
1. The attribute
// Address delimiter of the URL, used in the cache file, separated by the URL of the service provider
private static final char URL_SEPARATOR = ' ';
// URL address delimited regular expressions are used to parse the list of service provider urls in the file cache
private static final String URL_SPLIT = "\\s+";
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
// The local disk cache, which has a special key value of registies, records the list of registries and other records are the list of service providers
private final Properties properties = new Properties();
// Cache write executor
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1.new NamedThreadFactory("DubboSaveRegistryCache".true));
// Whether to synchronize save file flags
private final boolean syncSaveFile;
// Data version number
private final AtomicLong lastCacheChanged = new AtomicLong();
// A collection of registered urls
// The registered URL can be not only for the service provider but also for the service consumer
private final Set<URL> registered = new ConcurrentHashSet<URL>();
// Subscribe to a collection of listeners for the URL
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// a collection of urls of a type to which a consumer is notified
// The first key is the consumer's URL, which corresponds to the consumer.
// Value is a set of maps. The key of the map is a set of categories, such as providers, routes, and value is a set of urls that are notified
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
// Registry URL
private URL registryUrl;
// Local disk caches files, caching registry data
private File file;
Copy the code
For behind the meaning for the property is very helpful to decode method, from the above you can see in addition to the registry related some of the properties, you can see several is an attribute associated with disk cache file and read and write files, this is the URL mentioned above the cache to the local property of relevant properties here are a few points need to pay attention to:
- Properties: The properties data is synchronized with the local file data. When started, data is read from the file to properties, and when data changes in properties, data is written to file. Properties is a key corresponding to a list. For example, the key is the CONSUMER’S URL, and the value is a list of service providers, a list of routing rules, and a list of configuration rules. It means something like an attribute notified. Note that Properties has a special key for registies, which records the registry list.
- LastCacheChanged: Because every write to file is a full override write, not an incremental write to the file, this version number is needed to avoid the old version overwriting the new version.
- Notified: differs from properties in that the first data source is not a file but is read from a registry, and the second notified that values of the same category are aggregated by category.
2. Construct AbstractRegistry
Take a look at the source code:
public AbstractRegistry(URL url) {
// Put the URL in registryUrl
setUrl(url);
// Start file save timer
// Read from the URL whether to save the file synchronously, if there is no value default to save the file asynchronously
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// Get the file path
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
// Create a file
file = new File(filename);
if(! file.exists() && file.getParentFile() ! =null && !file.getParentFile().exists()) {
if(! file.getParentFile().mkdirs()) {throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); }}}this.file = file;
// Write the data in the file to properties
loadProperties();
// Notify the listener of the result of the URL change
notify(url.getBackupUrls());
}
Copy the code
A few points to watch:
- For example, whether to synchronize files and the path to save files, the URL configuration is preferred. If no related configuration is available, the default configuration is used.
- Constructing AbstractRegistry requires writing data from the file to properties and notifying listeners of URL changes. This method is described below.
3.filterEmpty
protected static List<URL> filterEmpty(URL url, List<URL> urls) {
if (urls == null || urls.isEmpty()) {
List<URL> result = new ArrayList<URL>(1);
result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
return result;
}
return urls;
}
Copy the code
This method does not need to explain the source code, very simple, is to determine whether the URL set is empty, if empty, the URL value of the key is empty to add to the set. This method is only used in notify to prevent notification URL changes from being null.
4.doSaveProperties
This method is quite long, I will not post source code here, need to check github analysis, this method is mainly memory cache properties data stored in a file, and do the version number control in the file, prevent the old version data overwrite the new version data. The data flow is the opposite of the loadProperties method.
5.loadProperties
private void loadProperties(a) {
if(file ! =null && file.exists()) {
InputStream in = null;
try {
in = new FileInputStream(file);
// Write data to memory cache
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry store file " + file + ", data: "+ properties); }}catch (Throwable e) {
logger.warn("Failed to load registry store file " + file, e);
} finally {
if(in ! =null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
Copy the code
This method is used to load the local disk cache file to the in-memory cache, i.e. write the data in the file to properties. Compare doSaveProperties method, where the key implementation is the difference between properties.load and properties.store. Reverse the data flow for doSaveProperties.
6.getCacheUrls
public List<URL> getCacheUrls(URL url) {
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
// Key is a category, such as a service provider category
String key = (String) entry.getKey();
// value is a list of a category, such as a list of service providers
String value = (String) entry.getValue();
if(key ! =null && key.length() > 0 && key.equals(url.getServiceKey())
&& (Character.isLetter(key.charAt(0)) || key.charAt(0) = ='_') && value ! =null && value.length() > 0) {
// Split out each value of the list
String[] arr = value.trim().split(URL_SPLIT);
List<URL> urls = new ArrayList<URL>();
for (String u : arr) {
urls.add(URL.valueOf(u));
}
returnurls; }}return null;
}
Copy the code
This method retrieves the value associated with the in-memory cache properties and returns it as a collection. From this method, it is clear what data format is stored in the properties.
7.lookup
Take a look at the source code:
@Override
public List<URL> lookup(URL url) {
List<URL> result = new ArrayList<URL>();
// Get a collection of all notified service urls subscribed to by the consumer URL
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
// Determine whether the consumer subscribes to the service
if(notifiedUrls ! =null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
// Check whether the protocol is null
if(! Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {// Add the service URL to which the consumer subscribedresult.add(u); }}}}else {
// Atomic classes avoid being guaranteed to be the latest set of urls when retrieving service urls registered in the registry
final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
// Notification listener. Triggered when a service change notification is received
NotifyListener listener = new NotifyListener() {
@Override
public void notify(List<URL> urls) { reference.set(urls); }};// Subscribing to a service, which is a consumer URL that subscribes to a service already registered in the registry (that is, adding listeners to that service)
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
if(urls ! =null && !urls.isEmpty()) {
for (URL u : urls) {
if(! Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) { result.add(u); }}}}return result;
}
Copy the code
This method implements the RegistryService interface to get a list of service urls subscribed to by the consumer URL. There are a few twists to this method that I’ll highlight here:
- The URL may be the consumer URL or the service URL registered in the registry. I have modified the URL in the comment to make the distinction clearer.
- Subscribed service urls must be registered in the registry.
- I’ll explain the subscribe method and NotifyListener for the subscription service below.
8.register && unregister
These two methods implement the RegistryService interface method, the logic inside is very simple, so I will not post the code, so as not to affect the space, if you really want to see, you can go to github to check, BELOW I will post this part of the comment github address. The logic to register is to add the URL to the registered property, and the logic to unregister is to remove the URL from the property, which is described above. The real implementation is in the FailbackRegistry class, which I describe below.
9.subscribe && unsubscribe
These two methods implement the RegistryService interface methods, respectively subscribe and unsubscribe. I’ll post a code to subscribe:
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// Get a collection of listeners for the services to which the consumer URL has subscribed
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
// Add a listener for a service
listeners.add(listener);
}
Copy the code
As you can see from the source code, subscribing simply adds a service notification listener to subscribed, which is implemented in the FailbackRegistry class.
10.recover
Recovery method, which resumes registrations and subscriptions when the registry is disconnected and successfully reconnected.
protected void recover(a) throws Exception {
// register
// Take registered from the memory cache and iterate over it
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if(! recoverRegistered.isEmpty()) {if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for(URL url : recoverRegistered) { register(url); }}// subscribe
Subscribed from the memory cache to iterate over
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if(! recoverSubscribed.isEmpty()) {if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for(NotifyListener listener : entry.getValue()) { subscribe(url, listener); }}}}Copy the code
11.notify
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
// Iterate over the set of listeners subscribing to the URL and notify them
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
/ / match
if(! UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
// Walk through the listener collection and notify them
Set<NotifyListener> listeners = entry.getValue();
if(listeners ! =null) {
for (NotifyListener listener : listeners) {
try {
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null|| urls.isEmpty()) && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// Classify urls
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
// Categories are performed according to the category value of the url. If there is no such value, categories are performed based on the providers value of the key
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
// Categorize results into resultresult.put(category, categoryList); } categoryList.add(u); }}if (result.size() == 0) {
return;
}
// Get the set of urls to which a consumer was notified (the result of the notification url change)
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
// Add the url corresponding to the consumer
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
// Handle notification listener URL change results
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// Place the categorized and notified list in the value
// downsized to 'notified'
// When the data for a category is empty, there will still be urls. Where 'urls[0].protocol = empty' handles the case where all service providers are empty.
categoryNotified.put(category, categoryList);
// Save to file
saveProperties(url);
// Notification listenerlistener.notify(categoryList); }}Copy the code
The notify method is used to notify the listener of the change result of the URL, but the change is full data. Full data means that the notification is based on the service interface and data type, that is, the partial data of the same type of a service is not notified. Users do not need to compare the last notification result. A few key points to note here:
- After the subscription is initiated, the full amount of data is retrieved, at which point notify is called. That is, Registry gets the full amount of data
- The notify method is called every time the registry changes, even though the change is incremental, and the caller calling this method has been processed, the incoming urls are still full.
- Notifies a listener, for example, when a new service provider is started, to create a new Invoker object.
12.saveProperties
Take a look at the source code:
private void saveProperties(URL url) {
if (file == null) {
return;
}
try {
/ / stitching url
StringBuilder buf = new StringBuilder();
Map<String, List<URL>> categoryNotified = notified.get(url);
if(categoryNotified ! =null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) { buf.append(URL_SEPARATOR); } buf.append(u.toFullString()); }}}// Set to properties
properties.setProperty(url.getServiceKey(), buf.toString());
// Add the version number
long version = lastCacheChanged.incrementAndGet();
if (syncSaveFile) {
// Store the data in the collection to a file
doSaveProperties(version);
} else {
// asynchronously enable save to file
registryCacheExecutor.execute(newSaveProperties(version)); }}catch(Throwable t) { logger.warn(t.getMessage(), t); }}Copy the code
The method is the data for a single consumer URL that corresponds to the notified file and is saved to a file by calling the doSaveProperties method, The difference between this method and doSaveProperties is that the doSaveProperties method saves the properties data to a file in full coverage, whereas saveProperties only saves the data for a single consumer URL.
13.destroy
This method is called when the JVM is shut down to unregister and subscribe. The specific logic is to call the unregister and unsubscribe methods, there is a need to see the source can enter Github view.
(6) FailbackRegistry under the support package
I already mentioned FailbackRegistry when I described the AbstractRegistry class. FailbackRegistry inherits the AbstractRegistry method. The actual implementation logic for registering subscriptions is implemented in FailbackRegistry, and FailbackRegistry provides a mechanism for retry failures.
Source note address: github.com/CrazyHZM/in…
1. The attribute
// Scheduled executor service
// Scheduled task executor
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1.new NamedThreadFactory("DubboRegistryFailedRetryTimer".true));
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// The failed retry timer periodically checks whether any request fails. If any request fails, retry indefinitely.
private finalScheduledFuture<? > retryFuture;// Failed to register a collection of urls
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
// Unregister a collection of failed urls
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
// Subscribe to a collection of failed listeners
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// Unsubscribe the collection of listeners that failed
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// Set of failed URL notifications
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
Copy the code
The properties of this class are easy to understand, and it is obvious that they are related to the retry failure mechanism.
2. Constructor
public FailbackRegistry(URL url) {
super(url);
// Read the retry frequency from the URL, if null, 5000ms by default
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// Create the failed retry timer
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run(a) {
// Check and connect to the registry
try {
/ / try again
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
Copy the code
The constructor creates a fail-retry timer. The retry frequency is set from the URL. If not set, the default is 5000ms.
3.register && unregister && subscribe && unsubscribe
These four methods are registered, unregistered, subscribe, unsubscribe of the concrete implementation, because the code logic is very similar, so to put together, the following is only posted registered source:
public void register(URL url) {
super.register(url);
// First remove the URL from the failed cache
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// Send a registration request to the registry
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// If startup detection is enabled, an exception is thrown
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// Put the failed URL in the cache and retry periodically.failedRegistered.add(url); }}Copy the code
If an exception fails, it will be added to the failed cache and retry. DoUnregister, doSubscribe, and doUnsubscribe are three methods abstracted from FailbackRegistry. The purpose is that each method to implement the registry is different. The corresponding registration, subscription and other operations will also be different, and the four methods abstract, in order to let the subclass only to focus on the implementation of the four, for example, redis implementation of the registry and ZooKeeper implementation of the registry is certainly not the same, then the corresponding registration and subscription operations are also different, Each then simply implements the abstract method.
For the other three methods, see my comments on Github if you need them.
4.notify
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
// Notify url data changes
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
// Put it in the failed cache and try again
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: "+ t.getMessage(), t); }}protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
Copy the code
You can see that the notify method is not the same as the one described above. You should go back to calling notify, which is an AbstractRegistry class.
5.revocer
@Override
protected void recover(a) throws Exception {
// register
// register resume registration, add to failedRegistered, retry periodically
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if(! recoverRegistered.isEmpty()) {if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for(URL url : recoverRegistered) { failedRegistered.add(url); }}// subscribe
// subscribe to restore the subscription, add to 'failedSubscribed' and retry periodically
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if(! recoverSubscribed.isEmpty()) {if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for(NotifyListener listener : entry.getValue()) { addFailedSubscribed(url, listener); }}}}Copy the code
Overrides recover of the parent class, puts the registration and subscription into the corresponding failure cache, and then periodically retries.
6.retry
The retry logic is implemented in this method, FailedRegistered for failure, failedUnregistered for failure, failedUnsubscribed for failure, failedUnsubscribed for failure, and failedNotified for failure The stored element is retried. The logic of the retry is to invoke the relevant method and then remove it from the cache, such as retry registration, doRegister first, and then remove the url from failedRegistered. For detailed comments, go to GitHub.
(7) AbstractRegistryFactory in the Support package
This class implements the RegistryFactory interface and abstracts the createRegistry method, which implements Registry’s container management.
1. The attribute
// Log output
// Log
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class);
// The lock for the acquisition process of the registry
// Lock on REGISTRIES to access race control
private static final ReentrantLock LOCK = new ReentrantLock();
// Registry Collection Map<RegistryAddress, Registry>
/ / set the Registry
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
Copy the code
2.destroyAll
public static void destroyAll(a) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// Lock up the registry shutdown process
/ / gets the lock
LOCK.lock();
try {
for (Registry registry : getRegistries()) {
try {
/ / destroy
registry.destroy();
} catch(Throwable e) { LOGGER.error(e.getMessage(), e); }}// Clear the cache
REGISTRIES.clear();
} finally {
// Release the lock
/ / releases the lockLOCK.unlock(); }}Copy the code
This method destroys all Registry objects and clears the memory cache. The logic is relatively simple. The key is to synchronize REGISTRIES.
3.getRegistry
@Override
public Registry getRegistry(URL url) {
/ / modify the url
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
// Calculate the key value
String key = url.toServiceString();
// Lock the registry access process to ensure a single instance of the registry
/ / gets the lock
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if(registry ! =null) {
return registry;
}
// Create a Registry object
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// Add to cache.
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
/ / releases the lockLOCK.unlock(); }}Copy the code
This method implements the RegistryFactory interface. I’ll talk about the key calculation in a future URL article. The most important thing to note here is createRegistry because the AbstractRegistryFactory class abstracts this method. In order for subclasses to pay attention to this method, for example, the Redis registry and zooKeeper registry must be created differently, and some of the same operations are already implemented in AbstractRegistryFactory. So just focus on and implement the abstract method.
(8) ConsumerInvokerWrapper && ProviderInvokerWrapper under the Support package
These two classes implement the Invoker interface and are Invoker wrappers for the service consumer and service provider, respectively, which wrap some properties. Let’s look at the source code:
1. ConsumerInvokerWrapper properties
/ / the Invoker object
private Invoker<T> invoker;
/ / the original url
private URL originUrl;
// Registry URL
private URL registryUrl;
// Consumer URL
private URL consumerUrl;
// Registry Directory
private RegistryDirectory registryDirectory;
Copy the code
2. ProviderInvokerWrapper properties
/ / the Invoker object
private Invoker<T> invoker;
/ / the original url
private URL originUrl;
// Registry URL
private URL registryUrl;
// Service provider URL
private URL providerUrl;
// Whether to register
private volatile boolean isReg;
Copy the code
Both classes are used in Dubbo QOS. For more information about Dubbo QOS, refer to the official documents
QOS website: dubbo.apache.org/zh-cn/docs/…
(ix) ProviderConsumerRegTable under the Support package
The service provider and consumer registry, which stores invokers for service providers and consumers in the JVM process, is also used in QOS, including the above two classes, which are related to the QOS Offline service command and ls list consumer and provider logic implementation. We can look at its properties:
// The service provider Invoker collection, where key is the key computed by the service provider's URL, as obtained by url.toServiceString()
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
// The service consumer's Invoker collection. Key is the key calculated for the service consumer's URL, obtained by the url.toServiceString() method
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
Copy the code
It can be seen that the call chain between the service provider, consumer and registry is actually recorded, in order to intuitively find all the call chains associated with it from one side.
This section is related to the implementation of o&M commands, so it is not explained here.
SkipFailbackWrapperException (10) support package
This class is a separate exception created by Dubbo and used in FailbackRegistry. The custom exception is a retry that skips a failure.
RegistryStatusChecker in the status package
This class implements StatusChecker, an interface for checking status, and RegistryStatusChecker, an extended class that does some registrie-specific status checking and setting. Let’s have a look at the source code:
@Activate
public class RegistryStatusChecker implements StatusChecker {
@Override
public Status check(a) {
// Get all registry objects
Collection<Registry> registries = AbstractRegistryFactory.getRegistries();
if (registries.isEmpty()) {
return new Status(Status.Level.UNKNOWN);
}
Status.Level level = Status.Level.OK;
StringBuilder buf = new StringBuilder();
// Concatenate the address in the registry URL
for (Registry registry : registries) {
if (buf.length() > 0) {
buf.append(",");
}
buf.append(registry.getUrl().getAddress());
// If the node in the registry is unavailable, concatenate disconnected with the state set to error
if(! registry.isAvailable()) { level = Status.Level.ERROR; buf.append("(disconnected)");
} else {
buf.append("(connected)"); }}// Returns the status check result
return newStatus(level, buf.toString()); }}Copy the code
The first concern is the @Activate annotation, which is automatically loaded by the RegistryStatusChecker class. This class implements the interface’s check method, which checks the status of the registry and returns the check results.
The integration classes RegistryProtocol and RegistryDirectory are not logically related to the core of the registry. RegistryProtocol is a dependency integration on Dubbo-Rpc-API and RegistryDirectory is a dependency integration on Dubbo-Cluster. If you are confused by the following parsing, you can skip this section and come back to it after MY article on RPC and cluster.
Integration package registration Protocol && Registration Directory
- RegistryProtocol implements the Protocol interface and extension classes such as the Protocol interface, but it can be considered not a real Protocol, it is the wrapper of the actual Protocol (dubo.rmi), so that a client request without information from the server at the beginning, It first pulls the service registration information from the registry and then directly connects to the server. RegistryProtocol is an implementation protocol based on the registry discovery service provider.
- RegistryDirectory: A registry service that maintains all available remote invokers or local invokers. Its Invoker collection is retrieved from the registry, and it implements the NotifyListener interface and the notify method of the callback interface. For example, when a consumer wants to invoke a remote service, it calls the notify method of the NotifyListener service of the consumer to all service providers that subscribe to the service. The callback interface passes in the URLS of all service providers and converts urls to Invokers, or refer application remote services.
These two classes wait for me to explain the RPC and cluster module after the supplementary source code analysis.
Afterword.
The source code for this section is github.com/CrazyHZM/in…
This article explained the internal logic implementation of dubbo’s registry, such as service registration, subscription and service change notification. In the next four articles, I will explain the logical implementation of dubbo, Multicast, ZooKeeper and Redis to implement the registry strategy. If I didn’t write enough or made mistakes in any part, please give me your advice.