Welcome to the public number [sharedCode] committed to mainstream middleware source analysis, you can contact me directly
The blogger’s personal website: www.shared-code.com
preface
In the previous article, we introduced the initialization of ServiceBean. Each exposed service has an instance of ServiceBean, and in this class there is a method that exposes the service.
The source code analysis of this article will be very in-depth, from the parameter analysis of the service, to how to connect to ZooKeeper, and finally at the start of port 20880, start dubbo service. All of the source code analysis is here, so this article is for those who want to have a deep understanding of Dubbo’s source code.
Exposure to entry
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// The service is not lazily loaded && the service is not published && the service is not offline. Service exposure occurs when these three conditions are met
if(isDelay() && ! isExported() && ! isUnexported()) {if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
// Service exposureexport(); }}Copy the code
When the service is not lazily loaded, the export method is called to expose the service, which is a method of its parent class, in ServiceConfig.
ServiceConfig#export
public synchronized void export(a) {
// The information provided by the service is not empty,
if(provider ! =null) {
if (export == null) {
// Set the publishing information
export = provider.getExport();
}
if (delay == null) {
// Get delay informationdelay = provider.getDelay(); }}// if export is false, do not publish, return directly.
if(export ! =null && !export) {
return;
}
// The delay exposure setting is greater than 0
if(delay ! =null && delay > 0) {
// Start a timed thread and execute the doExport method after the delay.
delayExportExecutor.schedule(new Runnable() {
@Override
public void run(a) {
// Service exposure
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
// Service exposuredoExport(); }}Copy the code
Description:
Through the above code, we have the basis to draw a conclusion
1. You can set the delay property to expose service delay. The unit of delay is millisecond.
2. If the delay property is not set or the delay value is -1, the service will be exposed when the Spring context is refreshed.
doExport
protected synchronized void doExport(a) {
// Unexported indicates that the service is offline.
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
// Exported = true to serve and expose without republishing
if (exported) {
return;
}
exported = true; // Set the exposure property to true,
// The service interface to be exposed cannot be empty.
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// Check the default configuration, provider configuration.
checkDefault();
// Provider property check
if(provider ! =null) {
if (application == null) {
application = provider.getApplication();
}
if (module= =null) {
module = provider.getModule();
}
if (registries == null) {
registries = provider.getRegistries();
}
if (monitor == null) {
monitor = provider.getMonitor();
}
if (protocols == null) { protocols = provider.getProtocols(); }}// Check module properties
if (module! =null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor(); }}// global application applies attribute checking
if(application ! =null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) { monitor = application.getMonitor(); }}//ref is the service exposed implementation class
// GenericService is an implementation of the dubbo generalization call.
// The generic interface implementation is mainly used when there is no API interface and model class element on the server side. All POJOs in parameters and return values are represented by Map, which is usually used for framework integration.
// For example, implement a generic Mock framework for remote services that handles all service requests by implementing the GenericService interface.
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if(StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); }}else {
try {
// Instantiate the exposed service interface instance through reflection
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// Check the method
checkInterfaceAndMethods(interfaceClass, methods);
// Check whether the implementation class of the service is consistent with the current service
checkRef();
generic = Boolean.FALSE.toString();
}
// If the value is set to true, the default proxy class name is used, that is, the interface name + Local suffix, the service interface client Local proxy class name, which is used to perform Local logic on the client, such as Local caching, etc. The Local proxy class construction function must allow the passing of remote proxy objects, constructors such as: public XxxServiceLocal(XxxService xxxService)
if(local ! =null) {
if ("true".equals(local)) {
local = interfaceName + "Local"; } Class<? > localClass;try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if(! interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface "+ interfaceName); }}// Dubbo's local stub configuration. http://dubbo.apache.org/zh-cn/docs/user/demos/local-stub.html
if(stub ! =null) {
if ("true".equals(stub)) {
stub = interfaceName + "Stub"; } Class<? > stubClass;try {
// Get an instance of the local stub class
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// Local stub class check
if(! interfaceClass.isAssignableFrom(stubClass)) {throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface "+ interfaceName); }}// Check the configuration.
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// Execute service exposure. The following process mainly follows this approach
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
Copy the code
Step description:
1. Check the configuration
2. Check the local, stub, and mock modes.
3. Run doExportUrls to expose services.
The above method, the main role is to check the configuration. The main logic of service exposure is not here.
doExportUrls
private void doExportUrls(a) {
Dubbo can support multiple registries
List<URL> registryURLs = loadRegistries(true);
// Dubbo can also support multiple Protocols
for (ProtocolConfig protocolConfig : protocols) {
// Expose servicedoExportUrlsFor1Protocol(protocolConfig, registryURLs); }}Copy the code
The loadRegisteries method returns the following registered address:
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService? application=dubbo-service&Dubbo = 2.6.2&pid=20748®istry=zookeeper×tamp=1537240877173
# #The application of&The version number&The application of PID&Registration type&The time stampCopy the code
If there are multiple registered addresses, multiple will be returned.
doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// Get the protocol name,
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
// If not set, the default is dubbo
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
// side , provider
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
// Dubbo, version number
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
// timestamp, current timestamp
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) { // Get the application PID,
/ / set
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// Concatenate parameters, the following five lines of code are concatenate parameters, write the parameters to the map set
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if(methods ! =null && !methods.isEmpty()) {
// Methods of dealing with...
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// Dubbo's version
String revision = Version.getVersion(interfaceClass, version);
if(revision ! =null && revision.length() > 0) {
map.put("revision", revision);
}
// The method to get the service interface
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
// The number of methods is empty
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// The number of methods is greater than 0
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); }}// The token is not empty
if(! ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else{ map.put(Constants.TOKEN_KEY, token); }}Injvm = injvm; injvm = injvm
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
// Set the registration property to false
protocolConfig.setRegister(false);
map.put("notify"."false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider ! =null) {
contextPath = provider.getContextpath();
}
/ / get the IP
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
// Get the port
Integer port = this.findConfigedPorts(protocolConfig, name, map);
//-----------------------------------------------------------------------------------------------------
// The above code is used to assemble the map parameter, which is used to concatenate the URL /
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
//
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// The publishing scope of the service
String scope = url.getParameter(Constants.SCOPE_KEY);
// If none is configured, the service will not be published
if(! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// If scope is not equal to remote, local publishing is done
if(! Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {// Local exposure
exportLocal(url);
}
// If scope is not equal to local, remote publishing is done
if(! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// Register URL is not empty
if(registryURLs ! =null && !registryURLs.isEmpty()) {
// Publish circularly
for (URL registryURL : registryURLs) {
//
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if(monitorUrl ! =null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// Get proxFactory to generate invokerInvoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));// Service exposed invoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// Call Protocol for service publishing
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else{ Invoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this); Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }}}this.urls.add(url);
}
Copy the code
The above code is long, so to summarize.
In the code, I’ve separated the two parts of the code using the middle bar, the part above the middle bar that collects the parameters, puts the parameters in the map, and generates the URL from the map,
The url is as follows:
dubbo:/ / 192.168.59.3:20880 / com. Dubbo. Service. User. UserFacadeService? Anyhost = true&application = dubbo - service&bind. IP = 192.168.59.3 & bind. The port = 20880 & dubbo = 2.6.2 & generic = false&interface = com. Dubb o.service.user.UserFacadeService&methods=getUser&pid=20748&side=provider&stub=com.dubbo.service.user.UserFacadeServiceSt ub×tamp=1537240877178
Copy the code
Contains all the parameters for the service. Under the bar, dubbo is about to launch the service.
There is an important property, **scope **, which has three Settings.
Local: only local exposure remote: only remote exposure None: no service exposureCopy the code
In the above code, the value of scope is null in the actual case, so it does both local and remote exposure, and both.
Local exposure is not covered in this article for the time being. The focus is on remote exposure, but there is one caveat:
** Local exposure in the same JVM is preferentially selected, such as consumers and producers running in the same Tomcat, to call the locally exposed service **
The service exposes the three most important lines of code
// Get proxFactory to generate invokerInvoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded (Constants.EXPORT_KEY, url.toFullString()));// Service exposed invoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// Call Protocol for service publishingExporter<? > exporter = protocol.export(wrapperInvoker);Copy the code
Generate the Invoker using registryURL as follows:
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService? Application = dubbo - service&dubbo = 2.6.2 & pid = 10588 & registry = zookeeper×tamp = 1537255304679
Copy the code
This is where the protocol type for Invoker is established. The protocol type in Invoker is Register, not Dubbo. Because this is used to publish services
ProxyFactory is an extended implementation class generated by Dubbo using SPI extension mechanism. The default implementation is Javassist. After generating Invoker, bind Invoker and ServiceConfig. Generate DelegateProviderMetaDataInvoker, strengthen it.
The export method of Protocol is called to publish the service.
Protocol is an extended proxy class generated through SPI. The code in Export is as follows
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
// Get the current protocol type, whether it is dubbo or something else, such as the local service publication, passed in register, used for registration.
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader
(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
Copy the code
The extension instance structure returned here is as follows: ProtocolListenerWrapper > ProtocolFilterWrapper > QosProtocolWrapper > RegistryProtocol
ProtocolListenerWrapper ProtocolFilterWrapper QosProtocolWrapper (ProtocolListenerWrapper) And the ** order of the three classes is inconsistent.
In my debugging, the Protocol decorator classes are executed in the following order:
QosProtocolWrapper
> ProtocolFilterWrapper
>QosProtocolWrapper
>RegistryProtocol
QosProtocolWrapper
For the registration operation, enable dubbo qos-server online o&M platform, default port: 22222
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// Check whether it is a registration operation.
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
// Enable the online O&M platform of Dubbo. By default, the port is occupied
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
Copy the code
ProtocolListenerWrapper
Add a listener to Invoker to remove the registration operation
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// Check whether it is a registration operation.
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
Copy the code
ProtocolFilterWrapper
Remove the registration operation and add a filter to Invoker
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// Check whether it is a registration operation.
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// buildInvokerChain builds the filter chain.
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
Copy the code
RegistryProtocol
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// Open dubbo's
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// Obtain the zooKeeper address
URL registryUrl = getRegistryUrl(originInvoker);
// Connect to ZooKeeper and establish a long-term connection
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// Get the register attribute to determine whether to register immediately
boolean register = registedProviderUrl.getParameter("register".true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
/ / register
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
Copy the code
With the exception of doLocalExport, the rest of the code interacts with ZooKeeper, registering urls, subscribing to resources, and so on.
As we know, the dubbo consumer calls the producer’s methods, which by default communicate via Netty, and the default port is 20880, and a NetTY service is started inside the producer. This operation is done in the doLocalExport method.
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// key = dubbo protocol URL
String key = getCacheKey(originInvoker);
// Get the decorator class for Dubbo's protocol Export
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
// If the value is empty, the dubbo service is not enabled.
if (exporter == null) {
synchronized (bounds) { / / synchronization locks
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) { // Double check
finalInvoker<? > invokerDelegete =new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// Here we start using protocol again, but note that this is invokerDelegete
exporter = newExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); }}}return exporter;
}
Copy the code
The protocol type in invokerDelegete is Dubbo, not register.
So the execution chain for protocol.export() is as follows: ProtocolListenerWrapper > ProtocolFilterWrapper > QosProtocolWrapper > DubboProtocol
The order of the first three is uncertain. Eventually, DubboProtocol is executed.
DubboProtocol
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// Get the URL of the current service
URL url = invoker.getUrl();
/ / service for the key, the key = com. The dubbo. Service. User. UserFacadeService: 20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if(isStubSupportEvent && ! isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded.")); }}else{ stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); }}// Enable the netty service port, which is dubbo 20880, and is ready to receive requests.
openServer(url);
optimizeSerialization(url);
return exporter;
}
Copy the code
Description:
The URL format of the service is as follows:
Dubbo: / / 192.168.59.3:20880 / com. Dubbo. Service. User. UserFacadeService? anyhost=true&application=dubbo-service&Bind the IP = 192.168.59.3&bind.port=20880&Dubbo = 2.6.2&generic=false&interface=com.dubbo.service.user.UserFacadeService&methods=getUser&pid=20748&side=provider&stub=com.dubbo.service.user.UserFacadeServiceStub×tamp=1537240877178
Copy the code
Dubbo Protocol Service Exposure flowchart:
Dubbo service exposes collation execution chain:
Zookeeper directory:
Welcome to the public number [sharedCode] committed to mainstream middleware source analysis, you can contact me directly
The blogger’s personal website: www.shared-code.com