Dubbo Service directory
What is a service catalog
In a cluster, the number of the service provider is not immutable, sometimes there will be extend or shrink capacity, increase or reduce the number of the machine, the changes of information need to be synchronized to the consumer group, the service directory contains all of the information service providers, and according to the number of service providers or configuration change and dynamic change, And encapsulate it as an Invoker list
AbstractDirectory
This class implements the Directory interface and implements the List method, which gets the Invoker list. The implementation logic is a common template method in the subclasses StaticDirectory and RegistryDirectory.
public abstract class AbstractDirectory<T> implements Directory<T> {
// logger
private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);
private final URL url;
// Whether to be destroyed
private volatile boolean destroyed = false;
// Consumer group URL
private volatile URL consumerUrl;
// Service routing
private volatile List<Router> routers;
public AbstractDirectory(URL url) {
this(url, null);
}
public AbstractDirectory(URL url, List<Router> routers) {
this(url, url, routers);
}
public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null)
throw new IllegalArgumentException("url == null");
this.url = url;
this.consumerUrl = consumerUrl;
setRouters(routers);
}
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
// It is up to subclasses to get the invoker directory logic
List<Invoker<T>> invokers = doList(invocation);
// Service routing is required after the invoker list is obtained
List<Router> localRouters = this.routers; // local reference
if(localRouters ! =null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
// Perform service routinginvokers = router.route(invokers, getConsumerUrl(), invocation); }}catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: "+ t.getMessage(), t); }}}return invokers;
}
protected void setRouters(List<Router> routers) {
// Service routing list
routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
/ / access service routing parameters of the router (tag, script, condition, mock)
String routerkey = url.getParameter(Constants.ROUTER_KEY);
// Obtain the route through SPI mechanism
if(routerkey ! =null && routerkey.length() > 0) {
RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
routers.add(routerFactory.getRouter(url));
}
// The default added types MockInvokersSelector and TagRouter
routers.add(new MockInvokersSelector());
routers.add(new TagRouter());
Collections.sort(routers);
this.routers = routers;
}
// omit other methods.protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
}
Copy the code
StaticDirectory
As you can see from the name, it’s a static directory that doesn’t change
public class StaticDirectory<T> extends AbstractDirectory<T> {
private finalList<Invoker<T>> invokers; .@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
returninvokers; }... }Copy the code
You can see that it returns directly to the Invoker list
RegistryDirectory
RegistryDirectory is a dynamic directory. NotifyListener interface is implemented and NotifyListener interface is implemented to dynamically refresh the invoker list. Let’s start with doList logic
@Override
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. The service provider is closed
// 2. Service provider is disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
List<Invoker<T>> invokers = null;
// Get the invokerk list corresponding to the method name from the cache
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
if(localMethodInvokerMap ! =null && localMethodInvokerMap.size() > 0) {
// Get the method name
String methodName = RpcUtils.getMethodName(invocation);
// Get the method parameter list
Object[] args = RpcUtils.getArguments(invocation);
if(args ! =null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]);
}
if (invokers == null) {
// Get the Invoker list by method name
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
// Get the Invoker list with * (generalization calls)invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); }}// Returns the invoker list
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
Copy the code
The Invoker list is retrieved from the local cache so where are the values in the local cache updated from? This class implements notify to dynamically refresh the Invoker list
@Override
public synchronized void notify(List<URL> urls) {
/ / store will, routers, configurators under the list of content
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
// Get the protocol name
String protocol = url.getProtocol();
// Get the category parameter value, which defaults to providers
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
/ / according to different category type url encapsulated into routerUrls configuratorUrls, invokerUrls
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer "+ NetUtils.getLocalHost()); }}// Package configuratorUrls as List
configurators
if(configuratorUrls ! =null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// Package routerUrls as List
routers
if(routerUrls ! =null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if(routers ! =null) { // null - do nothingsetRouters(routers); }}/ / the cache
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if(localConfigurators ! =null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
/ / configuration overrideDirectoryUrl
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); }}// Refresh the invoker list
refreshInvoker(invokerUrls);
}
Copy the code
private void refreshInvoker(List<URL> invokerUrls) {
// If the protocol header is Empty, set Forbidden to true, clear the cache, and destroy all invokers
if(invokerUrls ! =null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true;
this.methodInvokerMap = null;
destroyAllInvokers();
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
if (invokerUrls.isEmpty() && this.cachedInvokerUrls ! =null) {
// If the invokerUrls is empty but the cache is not empty, add the cache list to the invokerUrls directly
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
// If the cache is empty, add invokerUrls to the cache
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
// Convert the URL to Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
// Convert newUrlInvokerMap to
>> mapping
,list
// doList gets a List of methods based on their names from newMethodInvokerMap
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
// If there are multiple groups of providers, merge them into the cache
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
// Destroy useless invokers
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e); }}}Copy the code
If the protocol is Empty, clear the cache, destroy all invokers, install the URL as the Invoker List, and then convert the List into the mapping of <method,List< invoker >>. If there are multiple groups of service providers, merge them and add them to the cache. Finally empty the useless invoker. Let’s look at some of the specific operations
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
// Get the protocol configured on the service consumer side
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// Check whether the protocol is supported by the consumer
if(queryProtocols ! =null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break; }}If the service provider protocol header is not supported by the consumer, the current providerUrl is ignored
if(! accept) {continue; }}// Ignore the empty protocol
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// SPI checks whether the server protocol is supported by the consumer. If not, an exception is thrown
if(! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
/ / merge url
URL url = mergeUrl(providerUrl);
String key = url.toFullString();
// Filter duplicate urls
if (keys.contains(key)) {
continue;
}
keys.add(key);
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
// Create a new invoker if the cache does not hit
if (invoker == null) {
try {
boolean enabled = true;
if(url.hasParameter(Constants.DISABLED_KEY)) { enabled = ! url.getParameter(Constants.DISABLED_KEY,false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// Create a new invoker
invoker = newInvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl); }}catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if(invoker ! =null) {
// Add the newly created invoker to the cachenewUrlInvokerMap.put(key, invoker); }}else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
Copy the code
The logic is simple: check if the protocol is supported by the consumer, merge the URL to retrieve the invoker from the cache, and create a new invoker if the cache misses
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
if(invokersMap ! =null && invokersMap.size() > 0) {
for (Invoker<T> invoker : invokersMap.values()) {
// Get the methods parameter value from the URL
String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
if(parameter ! =null && parameter.length() > 0) {
// Get the list of methods
String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
if(methods ! =null && methods.length > 0) {
for (String method : methods) {
if(method ! =null && method.length() > 0
&& !Constants.ANY_VALUE.equals(method)) {
// Get the invoker list from the cache. If it misses, create a new invoker list and add it to the cache
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null) {
methodInvokers = newArrayList<Invoker<T>>(); newMethodInvokerMap.put(method, methodInvokers); } methodInvokers.add(invoker); } } } } invokersList.add(invoker); }}// Perform service level routing
List<Invoker<T>> newInvokersList = route(invokersList, null);
// Store <*, newInvokersList> mappings
newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
if(serviceMethods ! =null && serviceMethods.length > 0) {
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null || methodInvokers.isEmpty()) {
methodInvokers = newInvokersList;
}
// Perform method level routingnewMethodInvokerMap.put(method, route(methodInvokers, method)); }}// Sort to immutable list
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
}
Copy the code
The above logic is relatively simple: traverse the map, obtain the methods array, and store it in the map according to the mapping between method names and method names to the Invoker list. Route based on service and method levels, sort the Invoker list and transform it into an immutable list
private Map<String, List<Invoker<T>>> toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap) {
Map<String, List<Invoker<T>>> result = new HashMap<String, List<Invoker<T>>>();
for (Map.Entry<String, List<Invoker<T>>> entry : methodMap.entrySet()) {
String method = entry.getKey();
List<Invoker<T>> invokers = entry.getValue();
Map<String, List<Invoker<T>>> groupMap = new HashMap<String, List<Invoker<T>>>();
for (Invoker<T> invoker : invokers) {
// Get the group parameter value
String group = invoker.getUrl().getParameter(Constants.GROUP_KEY, "");
List<Invoker<T>> groupInvokers = groupMap.get(group);
if (groupInvokers == null) {
groupInvokers = new ArrayList<Invoker<T>>();
groupMap.put(group, groupInvokers);
}
groupInvokers.add(invoker);
}
// If groupMap has only one set of key-value pairs
if (groupMap.size() == 1) {
result.put(method, groupMap.values().iterator().next());
} else if (groupMap.size() > 1) {
// Merge multiple invokers
List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>();
for (List<Invoker<T>> groupList : groupMap.values()) {
groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList)));
}
result.put(method, groupInvokers);
} else{ result.put(method, invokers); }}return result;
}
Copy the code
If there is only one pair of groupMap, the key-value pair is returned. If there are multiple groupMap pairs, each group of Invokers is merged into Result by cluster class
Finally, there is the logic of destruction
private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
// check deleted invoker
List<String> deleted = null;
if(oldUrlInvokerMap ! =null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
// Find the intersection of oldUrlInvokerMap and newUrlInvokerMap and store it in List
deleted
for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if(! newInvokers.contains(entry.getValue())) {if (deleted == null) {
deleted = newArrayList<String>(); } deleted.add(entry.getKey()); }}}// Remove invoker from the deleted collection from oldUrlInvokerMap
if(deleted ! =null) {
for (String url : deleted) {
if(url ! =null) {
// Remove invoker from oldUrlInvokerMap
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if(invoker ! =null) {
try {
/ / destroy the invoker
invoker.destroy();
if (logger.isDebugEnabled()) {
logger.debug("destroy invoker[" + invoker.getUrl() + "] success. "); }}catch (Exception e) {
logger.warn("destroy invoker[" + invoker.getUrl() + "] faild. " + e.getMessage(), e);
}
}
}
}
}
}
Copy the code
The above logic is to find the URL to delete from oldUrlInvokerMap according to newUrlInvokerMap, remove it from oldUrlInvokerMap and destroy the Invoekr
conclusion
The logic of the whole service catalog has been seen, mainly to analyze the logic of the dynamic service catalog, and summarize:
- According to different category types the url url encapsulation into routerUrls, configuratorUrls
- Convert the URL to an Invoker list and a method-to-Invoker list mapping, or merge methodInvokerMap if there are multiple groups of providers
- Finally, destroy useless invokers