Exporting services involves registering services in a registry, which is not required but is a best practice, according to the configuration, and exporting services to a list and starting services. If we define a service provider, we need to first generate an executable Bean, and then the service consumer can request the interface exposed by the service provider over TCP
By default, Dubbo uses Zookeeper as its service registry and Netty as its network communication framework, so extending your knowledge of both frameworks will help you read the source code
Several important concepts:
URL, the Dubbo URL is not java.net.Url. The Dubbo URL indicates the location of a resource. It can represent a registry, a service provider, or a local resource
Invoker, although the website says that this is an entity domain, and all the other models are similar to it, but it’s a bit abstract, and it doesn’t have a clear definition, so let’s take a look at DubboInvoker, and it basically provides the ability to communicate with each other, in other words, Remote calls are basically done with Invoker
Proxy is a Proxy for Invoker. The purpose is to make the user use Invoker transparently, so as to avoid Dubbo code polluting the user code. Both of these can be found in JavassistProxyFactory
Protocol stands for this type of Protocol. If a registry is required, Dubbo uses the RegistryProtocol
doExport
When we talked about Dubbo’s integration with Spring, we talked about the service export entry point, which is called doExport, so let’s look directly at this method
protected synchronized void doExport(a) {
// Omit unimportant code
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStub(interfaceClass);
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// Execute the method export directly
doExportUrls();
CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), getExportedUrls());
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
Copy the code
This method is not so complicated, just do some verification, and call doExportUrls()
doExportUrls
private void doExportUrls(a) {
List<URL> registryURLs = loadRegistries(true);
for(ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); }}Copy the code
This approach mainly involves loading the registry and then exporting the service based on the registry configuration and protocol configuration
loadRegistries
protected List<URL> loadRegistries(boolean provider) {
// Check whether registry is configured
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if(registries ! =null && !registries.isEmpty()) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
}
String sysaddress = System.getProperty("dubbo.registry.address");
if(sysaddress ! =null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if(! map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol"."remote");
} else {
map.put("protocol"."dubbo"); }}// Convert the configuration to a URL
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
// Because of this, make the protocol registry so that the RegistryProtocol can be invoked when adaptive extensions are made
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true) | | (! provider && url.getParameter(Constants.SUBSCRIBE_KEY,true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
Copy the code
doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// Get the name of the Protocol, which determines which extension Protocol to use. If it is dubbo, it is DubboProtocol
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
// Here the assembly parameters can be ignored
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// The if block is very long. This block mainly assembles method properties, such as retries and parameters. I think it's ok not to worry about it
if(methods ! =null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries"."0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if(arguments ! =null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if(argument.getType() ! =null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if(methods ! =null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if(methodName.equals(method.getName())) { Class<? >[] argtypes = methods[i].getParameterTypes();// one callback in the method
if(argument.getIndex() ! = -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:"+ argument.getType()); }}else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) { Class<? > argclazz = argtypes[j];if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if(argument.getIndex() ! = -1&& argument.getIndex() ! = j) {throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if(argument.getIndex() ! = -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg:
or
"
); }}}}// end of methods for
}
// Generic call parameters
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
/ / version
String revision = Version.getVersion(interfaceClass, version);
if(revision ! =null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); }}if(! ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else{ map.put(Constants.TOKEN_KEY, token); }}if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify"."false");
}
// The context of the service export
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider ! =null) {
contextPath = provider.getContextpath();
}
// The address and port exposed by the service
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
/ / URL construction
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if(! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// Export to local
if(! Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); }// Export to remote
if(! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if(registryURLs ! =null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if(monitorUrl ! =null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if(StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } Invoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else{ Invoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this); Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }}}this.urls.add(url);
}
Copy the code
This method is very long, and all the exported logic is in this method. Next, we will focus on exportLocal() and protocol.export().
exportLocal
private void exportLocal(URL url) {
if(! Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST) .setPort(0);
StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
// Create exporters and add them to the exporters listExporter<? > exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); }}Copy the code
Exporting to a local URL is simple. You can create exporters based on the URL and add the Exporter to the URL. Using proxyFactory to get Invoker, proxyFactory is created according to the adaptive extension, the default implementation class is JavassistProxyFactory, mainly provides two methods, to get Proxy and get Invoker
Export to remote
First let’s take a look at wrappers. Some components in Dubbo provide wrappers for enhanced functionality. For example, Protocol has a protocol-FilterWrapper, ProtocolListenerWrapper, QosProtocolWrapper, etc. The wrapper sets the condition that the implementation class has a constructor with a Protocol argument. Set the wrapper in createExtension of ExtensionLoader as follows
Set<Class<? >> wrapperClasses = cachedWrapperClasses;if(wrapperClasses ! =null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
Copy the code
For exporting to remote, we mainly look at the RegistryProtocol, since we usually need a registry, so look at the registryProtocol.export ()
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// Export service. The originInvoker protocol is dubbo, so the exported logic is in DubboProtocol
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
// Get the registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
// Determine whether the service needs to be registered
boolean register = registeredProviderUrl.getParameter("register".true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
/ / register
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
Copy the code
RegistryFactory returns ZookeeperRegistryFactory, so the logic of registered in ZookeeperRegistry, it involves the operation of the Zookeeper, there is not much talked about, anyway, not too complicated to operate, in patient understand basic can understand it
Let’s go ahead and look at doLocalExport, the specific logic
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// Swap from cache, if null, export
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// invokerDelegete is a proxy Invoker
finalInvoker<? > invokerDelegete =new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// The protocol here is DubboProtocol
exporter = newExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); }}}return exporter;
}
Copy the code
The logic here is not responsible, but one thing to note is that protocol.export() is a DubboProtocol, that is, holding a DubboProtocol in the RegistryProtocol. You can see this in loadRegistries() in doExportUrls of ServiceConfig
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// Expose the interface
URL url = invoker.getUrl();
// Put The Exporter in cache
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// Determine whether the stub needs to be generated, and the callback
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if(isStubSupportEvent && ! isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded.")); }}else{ stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); }}// Start the service
openServer(url);
optimizeSerialization(url);
return exporter;
}
Copy the code
Here the key is openServer, probably is to open the TCP port, the bottom is to use Netty network communication, about Netty is interesting, interested or can learn more about, here will not expand
conclusion
The overall idea is not complicated. Details (mainly configuration) are not discussed in detail here, because many features of Dubbo are determined by configuration, so I think we can go to the official website to learn about the user manual
Service export basically defines a service provider, an executable Bean, to provide services for consumer requests. Then open the socket to provide the consumer connection, socket communication, the underlying default use of Netty framework