“This article has participated in the good article call order activity, click to view the back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”
Daily writing component, recently took a demand, let me responsible for implementing a RPC component, improve the efficiency of the company cross-server game development, in order to write this component, as a wave will dubbo in studied, at present the implementation of the component is approaching to the end, so going to dubbo study summarize, and with what RPC journey, Friends who also need to implement RPC, or who are interested in Dubbo, can follow this series.
Before writing an RPC component, I asked a few soul questions and found answers in Dubbo.
What is a service?
A module, a kind of gameplay, as long as the need for remote scheduling can be wrapped with the concept of service, HERE I simply wrapped a replica service, class situation is as follows
Mundane, wait, let’s see how does the provider mark the service
At this point, the basic definition of the service is complete.
Where did the service end up being registered?
On the XML configuration, we have seen a configuration with a registry
Yes, eventually provider-defined services are registered in the registry, and there are a number of supported types
You can see the specific demo instances provided inside, so what is the role of the registry?
The simple description is that the registry is where services are managed, providers place services in this administration, and subscribers take services from this administration if they want to use them, realizing the awareness of services through the registry.
Who will consume the service?
The consumer uses it, as we can see
Also plain code, is the consumer side to get the boss interface, directly call the corresponding interface.
The corresponding provider has XML to define the service registration, and the consumer has XML to define the service subscription information, as you can see
In simple terms, the provider puts the service in the registry, and the subscriber takes it from the registry.
The following source code sample is from Dubbo2.6x. Comments on the source code have been submitted to Github and can be cloned if needed:
Source code parsing github address
When is the service exposure triggered
When designing RPC components, I had to face this problem. In line with the idea of copying Dubbo, I studied the implementation scheme of Dubbo
Dubbo takes the classic XML configuration and, of course, uses NamespaceHandlerSupport to map the node configuration in the XML to the corresponding objects
Handlers are configured under the dubbo-config-spring package to specify the DubboNamespaceHandler
The DubboNamespaceHandler maps the XML configuration to the configuration of the tag to an object, such as a service
To see what ServiceBean does after it is mapped to an object, look first at the ServiceBean structure
It’s a listener by itself, and then CTRL+F12 to see what the methods are
After seeing export expose this method, ALT+F7 found that there are two places to call in addition to annotation Annoatition, respectively
The first is called after the property has been set, as you can see if it is a deferred function it will not be called.
The second is to call export when isDelay is seen, that is, the service exposed by delay is called after listening to the ContextRefreshedEvent event.
You can see this in the export method
The delay delay can be configured for different service configurations, certainly in the XML.
Dubbo’s trigger mechanism is built on NamespaceHandlerSupport to instantiate tags in XML, and after afterPropertiesSet or after listening to a container refresh event thrown by the Spring container, Trigger the exposure of the service.
Let me draw a flow chart to summarize
Since our service configuration is based on the YAML solution and XML is not introduced, I did not use NamespaceHandlerSupport to instantiate. Instead, I wrapped a ServiceBootstrap object like dubbo3.0 solution. Depending on the SmartLifeCycle lifecycle, the yamL configuration is fetched at start and traversed for service exposure. Dubbo3.0 has been tweaked quite a bit, which will be covered later, so keep an eye on the series if you’re interested.
A wave of URL
Before we talk about service exposure, we have to mention a wave of URLS, otherwise the main line is gone, and it’s hard to talk about the rest.
Before I came into contact with Dubbo, MY URL positioning refers to the network address, while in Dubbo, it can be considered as a convention, almost all modules of Dubbo pass parameters through the URL, what is the benefit of this?
Well we can think about it, if there is no agreement, then the parameters of the interaction between different interface will go bad, for a while is a string, a moment is the map, and with the provisions of the uniform, the code will be more normative and unified, when we look at the code will be more clear, and easy to expand, for example, if you want to develop something, Just concatenate the parameters directly to the URL.
As you can see, except for a few basic parameters, many of the parameters actually end up in parameters.
In the project of our company, we refer to the DESIGN of URL to build a metadata structure, namely map, and pass some dynamic parameters of the service through map.
Service exposure process
Before diving into the source code, let’s summarize the steps of service exposure. They are:
- Build, merge, and check configurations.
- URL assembly.
- Service exposure, registration.
I put these three main processes into the flow chart
Follow up on the specific logic of service exposure, i.e., after doExport
protected synchronized void doExport(a) {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// TODO:2021/5/27 Check whether the provider is empty. If the provider is empty, create one and initialize it using system variables
checkDefault();
/** Set various initial values **/
// TODO:2021/5/27 Check whether Application is empty
checkApplication();
// TODO:2021/5/27 Check whether the registry is empty
checkRegistry();
// TODO:2021/5/27 Check whether the protocols are empty
checkProtocol();
// TODO:2021/5/27 Supplementary parameters
appendProperties(this);
// TODO:2021/5/27 Stub validity check
checkStub(interfaceClass);
// TODO:2021/5/27 Mock validity check
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// TODO:2021/5/27 Multi-protocol multi-registry exposure services
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
Copy the code
It boils down to two steps:
-
Verify various configurations and update some configurations;
-
Multi-protocol multi-registry exposure services;
The details of the examination will not be laid out for the moment, because the whole process of exposing services is the focus, and the following part will be revisited after the governance of services, then continue to focus on the doExportUrls method
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls(a) {
// TODO:2021/5/27 Load the registry URL
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
// TODO:2021/5/27 Service exposure based on different protocolsdoExportUrlsFor1Protocol(protocolConfig, registryURLs); }}Copy the code
LoadRegistries is also simple: it is a simple assembly of urls based on the configuration of the registry. It is easier to understand multiple registries here, but what is multiple protocols?
A service that supports both Dubbo and Hessian protocols needs to be exposed to multiple registries using both protocols.
Reference to the logic, in our project, we regulated the registry interface, allowing the registry has a variety of implementation, and even local registry, but is not allowed to have more than one registry, for the moment there is no this kind of demand, and to choose which registry, you just need to configuration in yaml files
Now let’s look at the method doExportUrlsFor1Protocol
As mentioned before the analysis of the service exposure process, Dubbo internally uses urls to carry all kinds of data throughout the life cycle, and the entry actually starts with this method. We will see that the method can be divided into two steps. The first step is to assemble the URL logic. The latter step is where you actually implement the logic that exposes the Dubbo service and so on, so let’s move on to code
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
/** The URL of the assembly service starts **/
// TODO:2021/5/27 Obtain the protocol name
String name = protocolConfig.getName();
// TODO:2021/5/27 If empty, the default is dubbo
if (name == null || name.length() == 0) {
name = "dubbo";
}
// TODO:2021/5/27 Set parameters such as MAP
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// TODO:2021/5/27 Add the Application, Module, and provider information to the map
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// TODO:2021/5/27 If the methods configuration list is not empty, iterate over the methods configuration list
if(methods ! =null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// TODO:2021/5/27 Add method names to map
appendParameters(map, method, method.getName());
// TODO:2021/5/27 Add the field information of the methodConfig object to the map
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries"."0"); }}// TODO:2021/5/27 Add the ArgumentConfig list
List<ArgumentConfig> arguments = method.getArguments();
if(arguments ! =null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if(argument.getType() ! =null && argument.getType().length() > 0) {
// TODO:2021/5/27 Get all methods of the interface class using reflection
Method[] methods = interfaceClass.getMethods();
if(methods ! =null && methods.length > 0) {
// TODO:2021/5/27 traverse the methods
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// TODO:2021/5/27 Find target method
if (methodName.equals(method.getName())) {
// TODO:2021/5/27 Get method parameter type by reflectionClass<? >[] argtypes = methods[i].getParameterTypes();// TODO:2021/5/27 If the following table is -1
if(argument.getIndex() ! = -1) {
// TODO:2021/5/27 Check to see if the name of argtypes matches the type in ArgumentConfig
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
// TODO:2021/5/27 If no, throw an exception
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:"+ argument.getType()); }}else {
// TODO:2021/5/27 Walk through the arguments to find the type of argument.type
for (int j = 0; j < argtypes.length; j++) { Class<? > argclazz = argtypes[j];// TODO:2021/5/27 Add the ArgumentConfig field to the Map if it is found
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if(argument.getIndex() ! = -1&& argument.getIndex() ! = j) {throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if(argument.getIndex() ! = -1) {
// TODO:2021/5/27 The type attribute is not configured, but the index attribute is configured, and the index! = -1, is directly added to the map
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg:
or
"
); }}}}// end of methods for
}
// TODO:2021/5/27 If it is a generalization call, set generic and methods in the Map
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// TODO:2021/5/27 Obtain the version number
String revision = Version.getVersion(interfaceClass, version);
// TODO:2021/5/27 Add to map
if(revision ! =null && revision.length() > 0) {
map.put("revision", revision);
}
// TODO:2021/5/27 Get the collection of methods
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
// TODO:2021/5/27 The setting method is *
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// TODO:2021/5/27 Otherwise add to the method set
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); }}// TODO:2021/5/27 Add the token to the map
if(! ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else{ map.put(Constants.TOKEN_KEY, token); }}if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify"."false");
}
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider ! =null) {
contextPath = provider.getContextpath();
}
// TODO:2021/5/27 Obtain the address and port number
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// TODO:2021/5/27 Assembly generates the URL
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
/** End of the URL for the assembly service **/
/* * Follow up on service exposure */
}
Copy the code
This method is so long and smelly that I have deliberately divided it into two parts. This part is currently the URL part of the assembly service, which is simply:
First, put the basic configurations such as Provider, Applicaiton, and Module directly into the map, and then check the method configuration to check whether the configuration method exists, and verify the method signature. If yes, put the configuration into the map, and then add some extra data. For example, the generic call, version number and so on are added to the map, and finally the URL is assembled from the map according to the host and port, which seems to be a little long.
All you have to do is combine the various configurations of the service itself into a map, and then generate urls based on host and port, map, etc.
Let’s look at the follow-up service exposure
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
/* * The previous URL assembly */
// TODO:2021/5/27 Load the ConfiguratorFactory and generate a Configurator instance to see if an implementation of the protocol exists
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// TODO:2021/5/27 Configure the URL using the SPI mechanism
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// TODO:2021/5/27 If scope is None, do nothing
if(! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// TODO:2021/5/27 Expose to local if scope is not remote
if(! Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {/** Local service exposure **/
exportLocal(url);
}
// TODO:2021/5/27 If not local, it is exposed to remote
if(! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if(registryURLs ! =null && !registryURLs.isEmpty()) {
// TODO:2021/5/27 Traverse the registry
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// TODO:2021/5/27 Load the monitor connection
URL monitorUrl = loadMonitor(registryURL);
if(monitorUrl ! =null) {
// TODO:2021/5/27 Add one if none is available
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// TODO:2021/5/27 Get the proxy mode based on the URL
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
// TODO:2021/5/27 Add proxy mode to the registry URL
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// TODO:2021/5/24 Get the corresponding proxyFactory through SPI mechanism
/** Get Invoker from proxyFactoryInvoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this);
// TODO:2021/5/24 Get the corresponding protocol through SPI mechanism, first RegistryProtocol, and then reinforced by AOP
/** Service exposure **/
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// TODO:2021/5/24 Get the corresponding proxyFactory through SPI mechanism
/** Get Invoker from proxyFactoryInvoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this);
// TODO:2021/5/24 Obtain the corresponding protocol through the SPI mechanism
/** Service exposure **/Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }}}this.urls.add(url);
}
Copy the code
The following important place can be thought of as actually traversing the registry for service exposure, only according to the service configuration domain Scope to do some specific exposure processing, for example, if scope is not remote, it is exposed to local, if not local, it is exposed to remote.
This method contains several core extensions, including:
- Local service Exposure
- According to proxyFactory get Invoker
- Remote service exposure, registration
Continue to supplement the flow chart and organize ideas
First, look at the local service exposure logic
private void exportLocal(URL url) {
if(! Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST) .setPort(0);
StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
// TODO:2021/5/27 Obtained InjvmProtocol according to SPI and called the export partyExporter<? > exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local));// Cache it in the collection
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); }}Copy the code
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
Copy the code
The basic logic of exposure to the local is to get the InjvmProtocol according to the SPI mechanism, generate InjvmExporter, and then put it into the collection cache. As for the SPI mechanism, we need to open a special article in the future, and we are interested to keep an eye on this series.
Why should there be local service exposure?
The reason for this is that the same JVM may have internal references to its own services, so the exposed local services can directly consume services from the same JVM when called internally, avoiding network communication.
Let’s look at the generation pattern of the proxyFactory class. We can guess that the proxyFactory class has the ability to generate proxy objects
As you can see, this object is also generated through the SPI mechanism, because the SPI mechanism is also relatively large, in order to avoid confusion, the beginning of the article will be explained later, interested to continue to pay attention.
Through SPI mechanism to get the ProxyFactory implementation object JavassisProxyFactory, the final call code
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO:2021/5/23 Create Wrapper for target class
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// TODO:2021/5/23 Create an anonymous Invoker object and implement the doInvoker method
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName, Class
[] parameterTypes, Object[] arguments) throws Throwable {
// TODO:2021/5/23 Call the Wrapper invokeMethod method, which will eventually call the target method
returnwrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); }}; }Copy the code
The method is to create an anonymous Inovker object that calls the wrapper.invokeMethod method inside the doInvker method, and the invokeMethod eventually calls the target method.
What about wrapper?
The Wrapper is an abstract Class. When you call wrapper. getWrapper to create a subclass, it will be parsed according to the target Class object, get various methods, Class member variables and other information, and generate code such as invokeMethod method. The Class object generated by Javassist can be understood as a proxy instance of BossServiceImpl. Those interested in understanding how this is generated can see the wrapper.makeWrapper method.
Why is it necessary to encapsulate Invoker?
In fact, it is to mask the details of local call or remote call or cluster call, and unified expose an executable, which is convenient for the caller to call, but no matter how encapsulated, in fact, the final call to the target method.
Why encapsulate an Exporter?
This involves the subsequent invocation of the service, and there will be an article on this in the future, if you are interested.
In our RPC framework, instead of using Javassist to generate Proxy objects, we chose to use the Proxy generation mechanism provided by the JDK.
Continue to supplement the flow chart and organize ideas
Next comes remote service exposure
In the latter part of doturlsfor1Protocol, after generating Inovker via proxyFactory, it is necessary to call protocol.export to expose the service. We can see how the protocol is instantiated
Again instantiated by SPI, you can see through breakpoints that some additional operations are done that are intercepted by the AOP aspect first, but the RegisterProtocol that eventually goes to AOP, which is the piece of interest for subsequent analysis.
Let’s see what registerProtocol.export does
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// TODO:2021/5/29 Service exposure
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// TODO:2021/5/23 Get the URL for the registry
URL registryUrl = getRegistryUrl(originInvoker);
final Registry registry = getRegistry(originInvoker);
// TODO:2021/5/23 Get the URL of the service provider that has been registered
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
boolean register = registeredProviderUrl.getParameter("register".true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
// TODO:2021/5/29 Real place to do service registration
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// TODO:2021/5/23 Obtain the OVERRIDE subscription URL
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
// TODO:2021/5/23 Create listener for Override
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
// TODO:2021/5/23 Cache listeners into the collection
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// TODO:2021/5/23 Subscribe to the Registry for Override data
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// TODO:2021/5/23 Create and return DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
Copy the code
In code, this method actually does two things, service exposure and registration:
- DoLocalExport is executed for service exposure
- Load the registry implementation class to register the service with the registry
- Subscribe to the registry for Override data
- Create and return DestroyableExporter
Let’s go ahead and see what doLocalExport does
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// TODO:2021/5/24 Create Invoker as the delegate object
finalInvoker<? > invokerDelegete =new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// TODO:2021/5/24 Call the export method of protocol to expose the service
exporter = newExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); }}}return exporter;
}
Copy the code
(InjvmPortocol); (InjvmPortocol); (InjvmPortocol); (InjvmPortocol); (InjvmPortocol)
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// TODO:2021/5/29 Obtain the service key in the format of group+"/"+serviceName+":"+serviceVersion+":"+port
String key = serviceKey(url);
// TODO:2021/5/29 create exporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if(isStubSupportEvent && ! isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded.")); }}else{ stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); }}// TODO:2021/5/24 Start the server
openServer(url);
// TODO:2021/5/29 serialization
optimizeSerialization(url);
return exporter;
}
Copy the code
So you can go to export and you start with a New DubboExporter object, and then you open the service, and then you can see what openServer does
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
// TODO:2021/5/24 Start a service instance
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with overrideserver.reset(url); }}}private ExchangeServer createServer(URL url) {
// TODO:2021/5/29 The server is down. This parameter is mandatory
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// TODO:2021/5/29 Default heartbeat time
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// TODO:2021/5/29 Obtain the remote communication server implementation mode
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if(str ! =null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// TODO:2021/5/29 Add codec DubboCodec implementation
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// TODO:2021/5/29 Start the server
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ")" + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if(str ! =null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if(! supportedTypes.contains(str)) {throw new RpcException("Unsupported client type: "+ str); }}return server;
}
Copy the code
You can see that the end result is a server object created by relying on the remote communication implementation method carried by the URL.
To summarize: doLocalExport ends up opening the server based on the URL and returning to the Exporter.
Let’s move on to registration services
public void register(URL registryUrl, URL registedProviderUrl) {
// TODO:2021/5/29 Get the registry instance
Registry registry = registryFactory.getRegistry(registryUrl);
// TODO:2021/5/29 call register
registry.register(registedProviderUrl);
}
Copy the code
Regsitry generation is ultimately depend on the SPI mechanism, finally to FailbackRegistry. Register
@Override
public void register(URL url) {
super.register(url);
// TODO:2021/5/24 Removed from failed collection
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// TODO:2021/5/24 Initiates a registration request to the registry
doRegister(url);
} catch (Exception e) {
Throwable t = e;
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// TODO:2021/5/29 if an exception occurs, switch to failedRegisteredfailedRegistered.add(url); }}Copy the code
As you can see, the core implementation of registration is in doRegister, but we can also see through the code mechanism that when a registration error is reported, it will be intercepted by trycatch and put into the failedRegistered container. Given the FailbackRegistry class name, you can assume that there is a retry mechanism. Take a look at the constructor
// TODO:2021/5/24 Get the retry frequency parameter from the URL and start the timer for the retry logic
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run(a) {
try {
// TODO:2021/5/29 Retry at a specified time
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
Copy the code
Sure enough, eventually, if an exception occurs in the registration, a scheduled retry is performed.
The retry mechanism is also needed. In our RPC framework, we use yamL to configure the retry time, but instead of using the Executor mechanism, the timer is modeled after dubbo3.0, which is the time wheel mechanism, for better performance.
Looking at the registry core, doRegister, you can see that this method is an abstract method, and since the registry I configured in the XML configuration was Zookeeper, I ended up going to ZookeeperRegistry
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code
Service registration is basically the end of the road, further into the implementation of the registry.
Next, look at the section subscribing to the Registry for Override data
It has said that the registry. The subscribe (overrideSubscribeUrl overrideSubscribeListener). Finally the method is to FailbackRegistry subscribe
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// TODO:2021/5/29 Real place to do subscriptions
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if(urls ! =null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: "+ t.getMessage(), t); }}// TODO:2021/5/29 If the subscription fails, it is placed in the failure containeraddFailedSubscribed(url, listener); }}Copy the code
Similarly, when a subscription fails, it is put into the failure container and the subscription is periodically retried.
Look at core implementation method doSubscribe method, finally to ZookeeperRegistry. DoSubscribe
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// TODO:2021/5/29 Process subscriptions with interface * in the URL parameter, such as monitoring center subscriptions
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
/** ignore **/
} else {
List<URL> urls = new ArrayList<URL>();
// TODO:2021/5/29 Iterate over the classification array
for (String path : toCategoriesPath(url)) {
// TODO:2021/5/29 Get listener collection
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
// TODO:2021/5/29 If no, create it
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// TODO:2021/5/29 Get listener
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// TODO:2021/5/29 Notifies service changes and calls NotifyListener
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); }}); zkListener = listeners.get(listener); }// TODO:2021/5/29 create nodes, such as: dubbo/com. Alibaba. Dubbo. Demo. DemoService/will
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if(children ! =null) { urls.addAll(toUrlsWithEmpty(url, path, children)); }}// TODO:2021/5/29 Notify data changes, such as RegistryDirectorynotify(url, listener, urls); }}catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code
This method mainly makes the subscription and listening trigger logic, the specific logic is to subscribe to a service URL, when the service changes the trigger logical changes. In fact, it can be included in the service governance module. There will be a special article on service governance in the future. If you are interested, you can keep paying attention.
Draw a flow chart and organize your thoughts
There are still some things in the service exposure process, and you need to master Dubbo SPI, otherwise some points such as adaptive are difficult to understand, and I spent a lot of time on this article.
At the end I’m going to give you a complete flow chart to walk you through again, there’s still a lot of detail, but it’s not the main body and I’m not going to analyze it, otherwise it will fall apart.
The following service governance, APO and SPI mechanisms will also be expanded on the flowchart. Interested parties can also follow the flowchart link:
Flowchart link
conclusion
If you’re listening to the interviewer’s questions, remember the flow chart above. When you’re done with Dubbo, you’ll find there’s a lot to write about. For example, service application, SPI, DUbbo AOP mechanism, service governance and so on several modules, finally is to bring you a RPC framework, or that the words, want to learn Dubbo can continue to pay attention to this series.