Brief introduction to Dubbo registration discovery call process
preface
The author has been using Dubbo and Nacos as distributed governance frameworks since contacting with distributed governance. However, I have never studied how Dubbo registers and exposes services before. I wanted to write an article about Dubbo for a long time, but I did not have time to read Dubbo source code. So here to share their own views, if you have different views welcome to discuss together
The environment
- Pull code from Github open source site
- This article uses the 2.6.x branch (3.0 is too new to see, but the overall protocol is based on the 2.6-7 version, see dubbo website for details)
- openjdk8
The composition of dubbo
Dubbo’s overall design
Description:
- Blue is the interface used by the service consumer, green is the interface used by the service provider, and the central axis is the interface or implementation class used by both.
- The overall design of Dubbo can be divided into 10 layers, each layer is unidirectional dependence, and each time the upper layer can be stripped and reused, among which the Sevice layer and Config layer are API layer, and the other layers are SPI. So it’s very extensible
- In the figure, the green blocks are extension interfaces and the blue blocks are implementation classes. The figure only shows the implementation classes used to associate each layer.
- In the figure, the dotted blue line is the initialization process, that is, the assembly chain at startup, the solid red line is the method call process, that is, the run-time call chain, the purple triangle arrow is the inheritance, you can regard the subclass as the same node of the parent class, and the text on the line is the method called.
Each layer specification
- Config configuration layer: external configuration interface, centering on ServiceConfig and ReferenceConfig, can directly initialize configuration classes, or generate configuration classes through Spring configuration parsing
- Proxy ServiceProxy layer: transparent proxy of service interfaces. The client Stub and server Skeleton of the service are generated. The extension interface is ProxyFactory
- Registry layer: encapsulates the registration and discovery of service addresses, centering on service URLS and extending interfaces as RegistryFactory, Registry, and RegistryService
- Cluster routing layer: encapsulates routing and load balancing of multiple providers, Bridges registries, centers on Invoker, and extends interfaces to Cluster, Directory, Router, and LoadBalance
- Monitor monitoring layer: Monitors the number and time of RPC calls. It centers on Statistics and extends interfaces to MonitorFactory, Monitor, and MonitorService
- Protocol Remote Invocation layer: Encapsulates RPC Invocation with Protocol, Invoker, and half interface, based on Invocation and Result
- Exchange information exchange layer: It encapsulates the Request and Response mode, turns synchronous to asynchronous, uses Request and Response as the center, and uses exchange channel, ExchangeClient and ExchangeServer as the expansion interface
- Transport Network transport layer: Abstract MINA and Netty as the unified interface, Message as the center, extended interfaces are Channel, Transporter, Client, Server, Codec
- Serialize data Serialization layer: reusable tools with Serialization, ObjectInput, ObjectOutput, and ThreadPool extensions
Module subcontracting Instructions
- Dubo-common logic module: includes Util classes and common models.
- Dubbo -remoting: An implementation of the Dubbo protocol. If RPC uses RMI, this package is not required. This module contains the Transport layer and exchange layer in layer 10 as the base modules for RPC calls.
- Dubbo-rpc remote call module: Abstracts various protocols, as well as dynamic proxies, containing only one-to-one calls, with no concern for cluster management. This module contains the Protocol and Proxy layers in layer 10. Interface exposure and proxy generation are in this layer.
- Dubbo-cluster cluster module: multiple service providers can be disguised as one provider, including load balancing, fault tolerance, routing, etc. The address list of the cluster can be statically configured or delivered by the registry. This module is a Cluster layer of 10 layers
- Dubbo-registry registry module: clustering based on registries’ distribution addresses and abstractions to various registries.
- Dubo-monitor monitoring module: statistics service call times, call time, call chain tracking service. This layer is the Monitor layer of the 10 layers
- Dubo-config configuration module: it is an EXTERNAL API of Dubbo. Users use Dubbo through config to hide all details of Dubbo. This layer includes the Config layer of layer 10
- Dubbo-container: Is a Standlone container that starts with a simple Main load of Spring. There is no need to use a Web container to load services because services usually do not require Web container features such as Tomcat/JBoss.
- Dubbo-serialization: Serialization of RPC calls (currently there are 5 fastJSON, FST, Hessian2, JDK, kryo) the default is Hessian2. This module is the serialize layer in layer 10
At this point, all Dubbo source modules and design layers are matched. From this information you can actually see that Dubbo was very clear about the module layering when he designed it.
Project startup initialization process details (producer and consumer)
Analytical services
Based on the Meta-INF/Spring. handlers configuration in Dubo.jar, Spring calls back to the DubboNamespaceHandler when it encounters the Dubbo namespace. Label all dubbo, unified use DubboBeanDefinitionParser parsing, based on one-on-one attribute mapping, XML parsing for Bean object tags. When serviceconfig.export () or referenceconfig.get () is initialized, the Bean object is converted to URL format and all Bean properties are converted to URL parameters. Then, the URL is sent to the protocol extension point. Based on the extension point adaptive mechanism, services of different protocols are exposed or referenced according to the protocol header of the URL.
Exposed services
-
1.ServiceConfig parses the URL in the following format: dubbo://service-host/ com.foo.fooservice? Version = 1.0.0. Based on the extension point adaptive mechanism, through the URL dubbo:// protocol header recognition, directly call the export() method of DubboProtocol, open the service port.
-
Providers address registered in the registry, need 2, under the condition of ServiceConfig parsing the URL format for: registry: / / registry – host/org. Apache. Dubbo. Registry. RegistryService? export=URL.encode(“dubbo://service-host/com.foo.FooService? Version =1.0.0”), based on the extension point adaptive mechanism, through the REGISTRY :// protocol header recognition of the URL, the export() method of the RegistryProtocol is called to register the provider URL in the export parameter with the registry first. Dubbo ://service-host/ com.foo.fooservice? Version =1.0.0, then the export() method of the DubboProtocol is invoked to open the service port through the dubbo:// protocol header recognition of the provider URL based on the extension point adaptive mechanism.
Reference service
-
In the case of no registry and direct connection to the provider 3, the URL resolved by ReferenceConfig is in the following format: dubbo://service-host/ com.foo.fooservice? Version = 1.0.0. Based on the extension point adaptive mechanism, the refer() method of DubboProtocol is directly called through the DUbbo :// protocol header recognition of the URL to return the provider reference.
-
Discover reference services from the registry:
In the case that there is a registry and the provider address is found through the registry, 4, the FORMAT of URL resolved by ReferenceConfig is: registry://registry-host/org.apache.dubbo.registry.RegistryService? refer=URL.encode(“consumer://consumer-host/com.foo.FooService? Version = “1.0.0). Based on the extension point adaptive mechanism, the refer() method of RegistryProtocol is called through the registry:// protocol header of the URL. Based on the conditions in the refer parameter, the provider URL is queried, for example: dubbo://service-host/com.foo.FooService? Version = 1.0.0. Based on the extension point adaptive mechanism, the refer() method of DubboProtocol is called to get the provider reference through the dubbo:// protocol header recognition of the provider URL. RegistryProtocol then returns multiple provider references masquerading as a single provider reference through the Cluster extension point.
Summary of project startup initialization details
Regardless of producer or consumer, registry or not, the Protocol interface is indispensable. The DubboProtocol implementation class is used to expose and reference the service when there is no registry to remove and reuse. When there is a registry, Dubbo’s adaptive mechanism is changed to RegistryProtocol as the implementation class for exposing and referencing the service. So Protocol is accused of exposing and referencing services, and its implementation depends on the context in which Dubbo is used.
As shown in figure:
Protocol.java
/** * The exposed interface for remote invocation, where Invoker is converted to Exporter * 1. After receiving a request, the protocol records the source address of the request rpcContext.getContext ().setremoteAddress (); * 2. Export () must be idempotent, i.e. there is no difference between invoking the same URL once and invoking it twice@param<T> Service type Interface type *@paramInvoker Service Invoker interface type converts to URL and then converts to Invoker */
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
1. When a user invokes the invoke() method of the 'Invoker' object returned from the 'refer()' call, the protocol needs to correspond to the 'invoke()' method of the 'Invoker' object. The responsibility of the protocol is to implement the 'Invoker' returned from 'refer()'. In general, the protocol sends remote requests in the 'Invoker' implementation. * 3. When check=false is set in the URL, the implementation must not throw an exception, but try to recover if the connection fails. * /
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
Copy the code
The process by which a producer exposes a service
First, the ServiceConfig class gets the actual class ref that provides the service. HelloWorldImpl), and then generate an instance of AbstractProxyInvoker using ref in the getInvoker method of the ProxyFactory class. Next comes the transition from Invoker to Exporter. The key to Dubbo’s handling of service exposure is the conversion of Invoker to Exporter, shown in red in the image above. The following are two typical implementations of Dubbo and RMI: Dubbo implementation of the Dubbo protocol Invoker turns to my friend in the Export method of the DubboProtocol class, which mainly opens the socket listening service and receives various requests from the client. The communication details are implemented by Dubbo himself.
The process by which a consumer references a service
First, the init method of the ReferenceConfig class calls the refer method of Protocol to generate the Invoker instance (shown in red), which is the key to service consumption. Next, convert Invoker to the interface required by the client (for example, HelloWorld). The details of each protocol such as RMI/Dubbo/Web Service that calls the refer method to generate an Invoker instance are similar to those of the producer but the steps are different.
@SPI("javassist")
public interface ProxyFactory {
/** * Create proxy **@param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/** * Create proxy **@param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
/** * Convert the proxy to Invoker **@param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
Copy the code
From figure 1 shows this interface is used by default JavassistProxyFactory ProxyFactory. Java implementation class, and the interface is can use to producers and consumers, but also can know ProxyFactory: : getProxy method is applicable to the consumer, ProxyFactory::getInvoker applies to producers. It says that both producers and consumers use Invoker, but this getInvoker method is only used by producers. The following chart clearly shows why both producers and consumers have invokers.
Server source code analysis
Project initialization phase
The Dubbo service export process starts when the Spring container publishes a refresh event, and Dubbo immediately executes the service export logic upon receiving the event. The entry method for the service export is onApplicationEvent of ServiceBean. OnApplicationEvent is an event-response method that performs a service export upon receiving a Spring context refresh event. The method code is as follows: Source branch 2.6.x is the onApplicationEvent entry, And 2.7.8 exposure fusion to Springboot OneTimeExecutionApplicationContextEventListener. Java DubboBootstrap after listening. Start method, of course, the use is branch of Java 2.6. x
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean.DisposableBean.ApplicationContextAware.ApplicationListener<ContextRefreshedEvent>, BeanNameAware.ApplicationEventPublisherAware {
private transient volatile boolean exported;
private transient volatile boolean unexported;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// Whether the export is delayed (delayed false) && Whether the export is already performed && Whether the export is cancelled
if(isDelay() && ! isExported() && ! isUnexported()) {if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: "+ getInterface()); } export(); }}private boolean isDelay(a) {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null&& provider ! =null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay == -1);
}
public synchronized void export(a) {
// The current class inherits ServiceConfig. Java so it looks at the current
if(provider ! =null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) { delay = provider.getDelay(); }}if(export ! =null && !export) {
return;
}
if(delay ! =null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run(a) {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else{ doExport(); }}/** * the actual output operation */
protected synchronized void doExport(a) {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
// Check whether interfaceName is valid
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// Check whether the provider is empty. If it is empty, create a new one and initialize it using system variables
checkDefault();
if(provider ! =null) {... }if (module! =null) {... }if(application ! =null) {... }// Check if ref is a generalized service type
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if(StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); }}// ref non-genericService type
else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// Check the necessary fields in the interfaceClass and
tags
checkInterfaceAndMethods(interfaceClass, methods);
// Check the validity of ref
checkRef();
generic = Boolean.FALSE.toString();
}
// Local and stub functions should be the same, used to configure local stubs
if(local ! =null) {
if ("true".equals(local)) {
local = interfaceName + "Local"; } Class<? > localClass;try {
// Get the local stub class
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// Check whether the local stub class can be assigned to the interface class. If it cannot be assigned, an exception will be thrown to remind the user that the local stub class type is invalid
if(! interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface "+ interfaceName); }}if(stub ! =null) {... }// Check whether various objects are empty. If empty, create a new object or throw an exception
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStub(interfaceClass);
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// Export the service
doExportUrls();
CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), getExportedUrls());
// ProviderModel represents the service ProviderModel, which stores information about the service provider in this object.
// Service configuration information, service instances, etc. Each exported service corresponds to a ProviderModel.
// ApplicationModel holds all providerModels.
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
/** * multi-protocol multi-registry export service * converts the current object to url.java */
private void doExportUrls(a) {
// Load the registry link
List<URL> registryURLs = loadRegistries(true);
// Traverse protocols and export services under each protocol
for (ProtocolConfig protocolConfig : protocols) {
Assembly / / URLdoExportUrlsFor1Protocol(protocolConfig, registryURLs); }}/** * Convert the URL to Invoker pseudocode */
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {...//methods is a MethodConfig collection, which stores configuration information for the
tag
if(methods ! =null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// Perform all operations for method. }// end of methods for
}
// Similar to generics judgment
if (ProtocolUtils.isGeneric(generic)) {
...
} else{... }if(! ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else{ map.put(Constants.TOKEN_KEY, token); }}if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify"."false");
}
// Expose services that are urls converted into invokers. String scope = url.getParameter(Constants.SCOPE_KEY);// if scope = none, do nothing
if(! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// scope ! = remote, exported to the local PC
if(! Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); }// scope ! = local, export to remote
if(! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// There is a registry
if(registryURLs ! =null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
...
// Generate Invoker for the service provider class (ref)
// The producer's ProxyFactory gets InvokerInvoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this);
// The Invoker is exposed by default DubboProtocol
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
// There is no registry, only export services
else{ Invoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this); Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }}}this.urls.add(url);
}
/** * DubboProtocol. Java export method */
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if(isStubSupportEvent && ! isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded.")); }}else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
returnexporter; }}Copy the code
- This is actually the interface that Spring scanned from the project startup to load the urls in ServiceConfig that translate the interface into the URL. When the URL is loaded, the ref(actual implementation class) is converted into Invoker, and the interface is exposed through DubboProtocol after converting into Invoker. At this point the whole start-up process of the producer has gone through.
Consumer source code parsing
Consumers and producers are essentially the same, except in a different order. Producers get Invoker through ref and then expose it through Protocol, while consumers
public synchronized T get(a) {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// Check if ref is empty and init if it is
if (ref == null) {
The init method is mainly used to handle configuration and createProxy classes by calling createProxy
init();
}
return ref;
}
private void init(a) {
// Avoid repeated initialization
if (initialized) {
return;
}
/ /... I'm going to skip a lot of code and make a quick summary
// 1. It is used to check whether the ConsumerConfig instance exists
// 2. This logic is used to load the configuration corresponding to the interface name from system properties or configuration files and assign the result of parsing to the URL field. The URL field is typically used for point-to-point calls.
String resolve = System.getProperty(interfaceName);
// 3. Load the configuration corresponding to the interface name from system properties or configuration files and assign the result of parsing to the URL field. The URL field is typically used for point-to-point calls.
// 4. Check whether several core configuration classes are empty. If they are empty, try to obtain them from other configuration classes.
// 5. Collects various configurations and stores them in the Map.
// 6. Used to handle MethodConfig instances. This instance contains event notification configurations such as onReturn, onThrow, onInvoke, and so on.
// Get the service consumer IP address
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
//attributes are stored by system context.
StaticContext.getSystemContext().putAll(attributes);
// Create proxy
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp"."localhost".0, map);
final boolean isJvmRefer;
// This code determines whether the current class is local, which is called a local call
if (isInjvm() == null) {
// if the url configuration is specified, no local reference is made
if(url ! =null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
}
// Check whether local references are required according to the protocol, scope, and injVM parameters of the URL
For example, if scope=local is explicitly specified, isInjvmRefer returns true
else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
} else {
isJvmRefer = false; }}else {
// Get injVM configuration value
isJvmRefer = isInjvm().booleanValue();
}
// Local reference
if (isJvmRefer) {
// Generate a local reference URL with the protocol injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// Call the refer method to build InjvmInvoker instance
// RefProtocol is DubboProtocol for local references
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service "+ interfaceClass.getName()); }}// Remote call
else {
// If the url is not empty, the consumer wants to call point-to-point. This URL can be set in DubboReference's URL property
if(url ! =null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if(us ! =null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
// Check whether the URL protocol is Registry, if the user wants to use the specified registry
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else{ urls.add(ClusterUtils.mergeUrl(url, map)); }}}}// An empty URL indicates that the user is connected to a registry
else { // assemble URL from register center's configuration
// Load the registry URL
List<URL> us = loadRegistries(false);
if(us ! =null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if(monitorUrl ! =null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// Add the refer parameter to the URL and add the URL to the urlsurls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); }}// No registry is configured, an exception is thrown
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config
to your spring config."); }}// There is only one registry
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// Multiple registriesList<Invoker<? >> invokers =newArrayList<Invoker<? > > (); URL registryURL =null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url}}// The invokers will be merged into one through the cluster layer, depending on the strategy set by the consumer
if(registryURL ! =null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null&& consumer ! =null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
Dubo.consumer.check =false
if(c && ! invoker.isAvailable()) { ...// create service proxy
// Create the proxy
return (T) proxyFactory.getProxy(invoker);
}
Copy the code
The code flow above is basically from checking configuration classes and various system information at initialization to Map, and then getting consumer information, Refer and ProxyFactory.getProxy. Here’s what the refer and getProxy do.
Consumers create Invoker
- In the consumer case Invoker is created from Protocol, the refer method, which in turn includes DubboProtocol and RegistryProtocol, meaning Invoker is created with or without a registry.
DubboProtocol
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
/ * * * getClients. This method is used to get the client instance */
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else{ clients[i] = initClient(url); }}returnclients; }}Copy the code
RegistryProtocol
public class RegistryProtocol implements Protocol {
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// Take the registry parameter value and set it to the protocol header
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// Get the registry instance
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// The URL is converted to Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// group= @dubboReference (group=)
String group = qs.get(Constants.GROUP_KEY);
if(group ! =null && group.length() > 0) {
// To merge two groups, go to the cluster routing layer and let this layer choose which group to distribute
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
returndoRefer(getMergeableCluster(), registry, type, url); }}return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// Create an instance
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// Set up the registry instance
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// Generate service consumer links
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// Register service consumers, new node in the consumers directory
if(! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY,true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
// The current consumer sends a subscribed message to the registry, which notifies the concerned producer dynamically to the consumer
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// A registry can have multiple service providers, so you need to consolidate multiple service providers into one
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
returninvoker; }}Copy the code
Producer stop
- You’ve certainly seen a lot of proxy destruction when a producer goes offline, but none of it is an actual implementation class
/**
* Get proxy.
*
* @param cl class loader.
* @param ics interface class array.
* @return Proxy instance.
*/
public static Proxy getProxy(ClassLoader cl, Class
... ics) {
if (ics.length > 65535)
throw new IllegalArgumentException("interface limit exceeded");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
if(! ics[i].isInterface())throw new RuntimeException(itf + " is not a interface."); Class<? > tmp =null;
try {
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}
if(tmp ! = ics[i])throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
sb.append(itf).append('; ');
}
// use interface class name list as key.
String key = sb.toString();
// get cache by class loader.
// Get the Map for the class load. If not, create a new class loader
// ProxyCacheMap uses WeakHashMap. The entries in this class are weak references and there is not enough memory for dubbo to burst the memory
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
}
Proxy proxy = null;
synchronized (cache) {
do{... Get it from cache, forget about it}}long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
ccp = ClassGenerator.newInstance(cl);
Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();
for (int i = 0; i < ics.length; i++) {
if(! Modifier.isPublic(ics[i].getModifiers())) { String npkg = ics[i].getPackage().getName();if (pkg == null) {
pkg = npkg;
} else {
if(! pkg.equals(npkg))throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
ccp.addInterface(ics[i]);
for (Method method : ics[i].getMethods()) {
String desc = ReflectUtils.getDesc(method);
if (worked.contains(desc))
continue;
worked.add(desc);
intix = methods.size(); Class<? > rt = method.getReturnType(); Class<? >[] pts = method.getParameterTypes(); StringBuilder code =new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++)
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
if(! Void.TYPE.equals(rt)) code.append(" return ").append(asArgument(rt, "ret")).append(";"); methods.add(method); ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); }}if (pkg == null)
pkg = PACKAGE_NAME;
// create ProxyInstance class.
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
ccp.addConstructor(Modifier.PUBLIC, newClass<? >[]{InvocationHandler.class},newClass<? > [0]."handler=$1;"); ccp.addDefaultConstructor(); Class<? > clazz = ccp.toClass(); clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// create Proxy class.
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); Class<? > pc = ccm.toClass(); proxy = (Proxy) pc.newInstance(); }catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// release ClassGenerator
if(ccp ! =null)
ccp.release();
if(ccm ! =null)
ccm.release();
synchronized (cache) {
if (proxy == null)
cache.remove(key);
else
cache.put(key, newWeakReference<Proxy>(proxy)); cache.notifyAll(); }}return proxy;
}
Copy the code
- CCP and CCM are easy to confuse. CCP produces proxy classes for implementing classes, while CCM produces proxy classes.
Problems encountered
I didn’t understand why every time a producer restarts, I will report that there are no available producers.
- In the following figure, when the consumer starts up and the registry notifies the consumer that the producer is available, go back and refresh Invoker. In particular, the class ExchangeClient mentioned above is refreshed. You can see the properties of this class on Invoker. When a consumer calls a producer, it checks whether the client has a producer and raises an RPC exception if it does not. When the consumer is notified, he goes back to create the client, but when the registry notifies the consumer, the process depends on the registry’s load, and it will be notified very quickly when the registry is idle or the number of connections is low.
- The following image shows how the registry notifies consumers immediately when a producer is out of service. In my case, the local test was instantaneous, but I’m not sure, so I might want to see how Nacos handles producer offline. When the notification goes offline, the client in the Invoker will be emptied and the call will naturally be RPC exception. So while scrolling, you can wait one minute before deactivating another node.
summary
Use Dubbo from now to watch his source, found a lot details before you don’t understand or don’t understand, slowly he completed a process look like this design very much, but all calls are so ugly and easy to expose information into a url form, but the Dubbo usually belong to the internal call, will not be the network to see their is no so-called. If you need more details about the call, you can look into it later and fill in the blanks.