1. The implementation principle of annotation-based configuration parsing
The Dubbo framework supports several configuration parsing methods, such as Schema/ XML/annotations. Annotation-based configuration parsing is not only easy to use, but also more flexible and less code. Left quite a lot of the Spring framework interface can be configured to parse and import of resources, bean scanning and generate injection, etc., here about the Dubbo annotation parsing process is mostly done in invokeBeanFactoryPostProcessor. Dubbo framework relies mainly on the @ EnableDubbo annotation/ServiceAnotationBeanPostProcessor/ReferenceAnnotationBeanPostProcessor front-end processor and DubboConfigConfigu If the user uses the configuration file, the framework generates the corresponding bean as required. For example, on the Service provider side, the class using the Dubbo annotation @service needs to be imported as a bean. Inject proxy objects on the service consumer side for fields or methods that use the @Reference annotation.
When the Spring container startup time, if comments on using the @ import, will trigger a annotation selectImports methods, such as the designated DubboConfigConfigurationRegister EnableDubboConfig annotations, The registerBeanDefinations method is automatically called.
@EnableDubboConfig
@DubboComponentScan
public @interface EnableDubbo {
....
}
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {
...
}
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {
...
}
Copy the code
DubboConfigConfigurationRegister configuration register processing class, to the Spring container is registered DubboConfiguration bean, also registered the other like @ Reference annotation processing front processor, framework launch event listeners, etc.
public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {/ / comments on the configuration information AnnotationAttributes attributes. = AnnotationAttributes fromMap ( importingClassMetadata.getAnnotationAttributes( EnableDubboConfig.class.getName())); boolean multiple = attributes.getBoolean("multiple"); registerBeans(registry, DubboConfigConfiguration.Single.class); if (multiple) { registerBeans(registry, DubboConfigConfiguration.Multiple.class); } // Inject the generic bean registerCommonBeans(registry) into the Spring container; }} static void registerCommonBeans(BeanDefinitionRegistry registry) {// Inject Reference annotations preprocessor beans registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME, ReferenceAnnotationBeanPostProcessor.class); . // Inject the Dubbo application framework to start the listener bean, In this application startup registerInfrastructureBean Dubbo (registry, DubboBootstrapApplicationListener BEAN_NAME, DubboBootstrapApplicationListener.class); . }Copy the code
If the user configates a property, such as dubo.application.name, the Spring Bean is automatically created into the container. Registration and configuration object Bean properties binding in registerConfigurationBeanDefination approach.
public class ApplicationConfig extends AbstractConfig { .... private String name; private String version; . } public class DubboConfigConfiguration { @EnableConfigurationBeanBindings({ @EnableConfigurationBeanBinding(prefix = "dubbo.application", type = ApplicationConfig.class), ..... }) public static class Single { } @EnableConfigurationBeanBindings({ @EnableConfigurationBeanBinding(prefix = "dubbo.applications", type = ApplicationConfig.class, multiple = true), ..... }) public static class Multiple { } } @Import(ConfigurationBeanBindingsRegister.class) public @interface EnableConfigurationBeanBindings { EnableConfigurationBeanBinding[] value(); } public class ConfigurationBeanBindingsRegister implements ImportBeanDefinitionRegistrar, EnvironmentAware { .... @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {/ / comments related attributes AnnotationAttributes attributes. = AnnotationAttributes fromMap ( importingClassMetadata.getAnnotationAttributes( EnableConfigurationBeanBindings.class.getName())); / / get annotations associated value AnnotationAttributes [] AnnotationAttributes = attributes. GetAnnotationArray (" value "); ConfigurationBeanBindingRegistrar registrar = new ConfigurationBeanBindingRegistrar(); registrar.setEnvironment(environment); For (AnnotationAttributes Element: annotationAttributes) { registrar.registerConfigurationBeanDefinitions(element, registry); }}... }Copy the code
When a user use @ DubboComponentScan, will activate the DubboComponentScanRegister, At the same time generate ServiceClassPostProcessor and ReferenceAnnotationBeanPostProcessor two processors, used for processing production and consumption.
public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {// Scan package path Set<String> packagesToScan = getPackagesToScan(importingClassMetadata); / / generated generates annotation processor registerServiceAnnotationBeanPostProcessor (packagesToScan, registry); // Generate a consumer annotation handler registerCommonBeans(registry); }}Copy the code
The ServiceClassPostProcessor processor implements BeanDefinitionRegistryPostProcessor interface, All the Bean in the Spring container after registering callback postProcessBeanDefinitionRegistry method start scanning @ DubboService annotations and into the container.
public class ServiceClassPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware, ResourceLoaderAware, BeanClassLoaderAware { @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {// The injection container starts the event listener, and when the fresh event is received, Will start the Dubbo application framework registerBeans (registry, DubboBootstrapApplicationListener. Class); Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan); if (! Collectionutils. isEmpty(resolvedPackagesToScan)) {// Trigger the ServiceBean definition and injection registerServiceBeans(resolvedPackagesToScan, registry); } } private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList( DubboService.class, Service.class, com.alibaba.dubbo.config.annotation.Service.class ); private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) { DubboClassPathBeanDefinitionScanner scanner = new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader); BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry); scanner.setBeanNameGenerator(beanNameGenerator); // Specify three annotation methods for scanning, Without scanning. Other types of annotations serviceAnnotationTypes forEach (annotationType - > {scanner. AddIncludeFilter (new AnnotationTypeFilter(annotationType)); }); For (String packageToScan: packagesToScan) {// Inject @dubboService as different beans into the container scanner.scan(packageToScan); Set<BeanDefinitionHolder> beanDefinitionHolders = findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator); if (! CollectionUtils.isEmpty(beanDefinitionHolders)) { for (BeanDefinitionHolder beanDefinitionHolder : BeanDefinitionHolder, registry, scanner) {// registerServiceBean definitions and do data binding and resolution registerServiceBean(beanDefinitionHolder, registry, scanner); } } } } }Copy the code
In actual use, will inject @ Reference in using class notes, can easily remote calls, including the attribute of Dubbo injection is through ReferenceAnnotationProcessor processing, mainly through the access class @ DubboRefence annotation fields or methods, This is done by reflecting a reference to a set field or method, and any annotated object is converted to a ReferenceBean object.
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements ApplicationContextAware, ApplicationListener<ServiceBeanExportedEvent> { @Override protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<? > injectedType, InjectionMetadata.InjectedElement injectedElement) throws Exception { .... // Register the ReferenceBean(referencedBeanName, ReferenceBean, Attributes, localServiceBean, injectedType); / / cache injection ReferenceBean cacheInjectedReferenceBean (ReferenceBean injectedElement); // Create proxy, GetOrCreateProxy (referencedBeanName, referenceBean, localServiceBean, injectedType) return getOrCreateProxy(referencedBeanName, localServiceBean, injectedType); } @Override public PropertyValues postProcessPropertyValues( PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {// Find all fields and methods marked with @reference findInjectionMetadata(beanName, bean.getClass(), pvs); Metadata. Inject (bean, beanName, PVS); } catch (BeanCreationException ex) { throw ex; } catch (Throwable ex) { ..... } return pvs; }}Copy the code
2. Implementation principle of service exposure
2.1 Service exposure mechanism
The Dubbo start listener is injected into the DubboUtils#registerCommonBeans method. The DubboBootstrap#start and DubboBootstrap#stop methods are called when the Spring application is started and destroyed.
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener implements Ordered { .... Private void onContextRefreshedEvent(ContextRefreshedEvent event) {// Dubbo framework application starts dubBobootstrap.start (); } private void onContextClosedEvent(ContextClosedEvent event) {// Dubbo framework application stops dubbobootstrap.stop (); }... } public DubboBootstrap start() { .... // Initialize the configuration and listener, pull the registry configuration information, initialize the service metadata initialize(); // exportServices(); if (! IsOnlyRegisterProvider () | | hasExportedServices ()) {/ / exposed service metadata exportMetadataService (); // Register the service provider instance to the registry registerServiceInstance(); } referServices(); . }Copy the code
Dubbo framework divides remote service exposure into two parts. In the first part, the holding service instance is converted to Invoker by proxy, and in the second part, Invoker is converted to Exporter through specific communication protocols such as Dubbo/Thrift. Invoker is an important component of the whole framework. It acts as a “link between the preceding and the following”, which can be a local implementation, a remote implementation, or a cluster implementation.
The protocol instance automatically ADAPTS according to the URL of service exposure and extracts specific protocols. For example, ZooKeeper first creates a registry instance, then extracts the specific service URL corresponding to export, and finally exposes the service using the protocol corresponding to the service URL (the default protocol is Dubbo). After the service is successfully exposed, the service metadata is registered with ZooKeeper, and the URL information is as follows.
Registry: / / 127.0.0.1:2181 / org. Apache. Dubbo. Registry. RegistryService? Application = dubbo - demo - the annotation - provider&dubbo = 2.0.2 & pid = 51310 & registry = zookeeper&release = 2.7.7 & timestamp = 1613395854976Copy the code
After the service instance ref is converted to Invoker, more fine-grained control is provided via RegistryProtocol#export, such as service exposure before registering service metadata. The registry does the following things in sequence when the service is exposed:
-
Delegate specific protocol (Dubbo) for service exposure, and create NettyServer listening port and save the service instance;
-
Create a service center object and establish a TCP connection with the registry.
-
Register service metadata to the registry;
-
Subscribe to a Configurators node that listens for dynamic property changes to the service.
-
Service destruction finishing work, such as closing ports, unregistering service information, etc.
public class ServiceConfig extends ServiceConfigBase {
public synchronized void export() {
if (bootstrap == null) { bootstrap = DubboBootstrap.getInstance(); bootstrap.init(); } / / initialize the service metadata serviceMetadata setVersion (version); serviceMetadata.setGroup(group); serviceMetadata.setDefaultGroup(group); serviceMetadata.setServiceType(getInterfaceClass()); serviceMetadata.setServiceInterfaceName(getInterface()); serviceMetadata.setTarget(getRef()); doExport();Copy the code
}
private void doExportUrls() { ….. / / get the current service corresponding registry instance List registryURLs = ConfigValidationUtils. LoadRegistries (this, true);
// If a service specifies to expose multiple protocols (Dubbo, REST), expose services in sequence for (ProtocolConfig ProtocolConfig: protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { ... Invoker<? > invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // Specify a protocol exposure service, such as DubboProtocol Exporter<? > exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); .Copy the code
}
}
public class JavassistProxyFactory extends AbstractProxyFactory { @Override public Invoker getInvoker(T proxy, Class type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName() .indexOf(‘$’) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<? >[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); }}; }}
Obtain the configuration object through reflection and put it into map for subsequent construction of URL parameters (such as application name), mainly distinguish global configuration, add default prefix in front of attribute by default, when the framework obtains URL parameters, create Invoker object through dynamic proxy. An AbstractProxyInvoker instance is generated on the server side. Any real method calls are delegated to the proxy, which then forwards them to the service ref call. There are currently two proxy methods: JavassistProxyFactory and JdkProxyFactory. The principle of JavassistProxyFactory mode is to create the Wrapper subclass, in the subclass to achieve invokeMethod method, method body for each ref method name and method parameters matching verification, if the match can be directly called. Saves the overhead of reflection calls compared to JdkProxyFactory.
After the invocation interceptor is constructed, the Dubbo protocol is called for service exposure. Here is the DubboProtocol#export code. It will be used as key to associate specific service Iovoker according to service group, version, service interface and exposed port. Only the interface exposed for the first time needs to open port listening, trigger the binding method in Exchange, and finally call NettyServer for processing. A number of handlers are initialized during Server initialization to support heartbeat, business thread pool processing codecs, and response method calls.
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // Key String constructed by service group, version, interface, and port key = serviceKey(URL); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); Export map. Put (key, exporter); // Export map. . OpenServer (url); openServer(url); optimizeSerialization(url); . return exporter; } private void openServer(URL url) { String key = url.getAddress(); boolean isServer = url.getParameter(IS_SERVER_KEY, true); If (isServer) {// Server instances cache ProtocolServer Server = Servermap. get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } } } else { server.reset(url); } }z } private ProtocolServer createServer(URL url) { .... ExchangeServer server; Try {// Create NettyServer and initialize Handler Server = exchangers.bind (url, requestHandler); } catch (RemotingException e) { .... }... return new DubboProtocolServer(server); }Copy the code
Various interceptor filters are triggered when the service is actually called. The framework initializers the interceptor before the service is exposed. ProtocalListenerWrapper is automatically injected when Dubbo loads the Protocol extension point. ProtocolListenerWrapper –> ProtocolFilterWrapper –> DubboProtocol. In the ProtocolListenerWrapper implementation, The corresponding listener method is called back when the service provider is exposed
public class ProtocolListenerWrapper implements Protocol { ... @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (UrlUtils.isRegistry(invoker.getUrl())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY))); }... } public class ProtocolFilterWrapper implements Protocol { private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // Load all interceptor class instances List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (! filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; Last = new Invoker<T>() {@override public Result invoke(Invocation) throws RpcException { Result asyncResult; // Each invocation is passed to the next interceptor asyncResult = filter.invoke(next, Invocation); return asyncResult.whenCompleteWithContext((r, t) -> {... }); . }); }... } } return last; }}Copy the code
2.2 Service Registration
The overall process is shown in the figure below, and there is a more detailed explanation on the official website.
- When the service provider starts up, it writes its own metadata information to the registry and subscribes to configuration metadata information.
- When a consumer starts, it also writes its own metadata information to the registry and subscribes to the service provider, routing, and configuration metadata information.
- When the service governance center is started, it subscribes to all consumer, service provider, routing, and configuration metadata simultaneously;
- When a service provider leaves or a new service provider joins, the registry service catalog will change, and the change information will be dynamically notified to consumers and service governance center.
- When a consumer initiates a service invocation, the invocation and statistics are asynchronously reported to the monitoring center.
Zookeeper is generally used as a service registry. Zookeeperd is a registry in a tree structure. Each node can be divided into persistent nodes, persistent sequential nodes, temporary nodes and temporary sequential nodes.
- The root node of the tree is the registry group, which has multiple service interfaces. The group comes from the group attribute, which defaults to /dubbo.
- Service interfaces contain four subdirectories, namely providers, consumers, Routers, and Configuratiors. The path is a persistent node.
- The service provider directory (/dubbo/service/providers) contains interfaces with multiple service URL metadata information.
- The interface contained under the service consumers directory (/dubbo/service/consumers) has multiple consumer URL metadata information;
- The/Dubbo /service/ Routers directory contains multiple URL metadata information for consumer routing policies.
- Dynamic configuration directory (/ dubbo/service/configurators) the following dynamic configuration URL contains multiple service metadata information.
2.2.1 Implementation principles of the Registry
Service subscription and publication is one of the core functions of the entire registry. When an existing service provider node goes offline or a new service provider node joins the microservice environment, consumers who subscribe to the corresponding interface can be notified by the registry in time and updated local information. The Dubbo framework has a dedicated Registry functionality layer to implement subscription and publishing of services. Dubbo currently supported Zookeeper/Redis/euraka/nacos etc. Various middleware as a registry, we commonly used is a Zookeeper registry. Dubbo’s registry module uses design patterns such as template pattern and factory pattern to ensure extensibility.
Abstract: AbstractRegistryFactory implements the RegistryFactory interface’s getRegistry(URL URL) method to lock and create an implementation by invoking the template method createRegistry(URL URL). And cache it in memory. Registry have several specific factory class, such as zookeeperRegistryFactory/RedisRegistryFactory, framework for zookkeeperRegistryFactory factory class implements by default.
public abstract class AbstractRegistryFactory implements RegistryFactory { @Override public Registry getRegistry(URL url) { url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = createRegistryCacheKey(url); LOCK.lock(); Registry = registries.get (key); Registry = registries.get (key); if (registry ! = null) { return registry; } // If the registry has not already been created, Call the abstract createRegistry(URL) method to create a new //createRegistry method implemented by concrete subclasses such as ZookeeperRegistryFactory registry = createRegistry(URL); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { LOCK.unlock(); } } } public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } @override public Registry createRegistry(URL URL) {return new ZookeeperRegistry(URL, zookeeperTransporter); }}Copy the code
2.2.2 Implementation Principles of Service Publishing
Both service providers and consumers need to register themselves in the registry, and the implementation code published using ZooKeeper is as simple as calling the ZK client to create a directory on the registry.
Create (toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); //zkClient create directory zkClient.create(toUrlPath(url), url.getParameter(Constants. //zkClient deletes path zkclient. delete(toUrlPath(url);Copy the code
AbstractRegistry implements registration, subscription, query, notification, and other methods in the Registry interface, as well as persisting registration information on disk files. In addition, FailbackRegistry inherits AbstractRegistry, rewrites the registration, subscription, query, and notification methods of the parent class, and adds a retry mechanism.
Public abstract void doRegister(URL URL); //FailbackRegistry abstract class void doRegister(URL URL); public abstract void doUnregister(URL url); public abstract void doSubscribe(URL url, NotifyListener listener); public abstract void doUnsubscribe(URL url, NotifyListener listener);Copy the code
Take the subscription logic as an example, FailbackRegistry rewrites the subscribe method, only to realize the general logic of subscription and exception handling and other general things, but the specific implementation is handed to the inheritance of the subclass implementation, take ZookeeperRegistry implementation class as an example, Zk client-related API interface is used in the specific subscription logic for node operation and change event subscription.
public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); // Subclass doSubscribe(url, listener); } catch (Exception e) { ...... }}}Copy the code
2.2.3 Implementation principle of service subscription
There are usually two methods of subscription: pull and push, where the client periodically polls the registry to pull the configuration, and the registry actively pushes data to the client. At present, Dubbo uses the first start pull mode, and then receives events to pull data again. When a service is exposed, a server subscribes to Configurators to listen for dynamic configurations. When a consumer starts up, the consumer subscribes to provicers, Routers, and Configurators to notify application service providers, routes, and dynamic configuration changes.
Zookeeper registry USES is “event notification” + “pull” client, the client connection registry for the first time, will obtain the corresponding directory full amount of data, and register a watch on the subscription of nodes to monitor, maintain long TCP connection between the client and the registry, the follow-up of each node has any data change over time, The registry actively notifies the client based on watcher’s callback, and upon receiving the notification pulls the full amount of data from the corresponding node (client-side pull), as shown in the NotifyListener#notify(List urls) interface.
Client connection registry for the first time, when they subscribe to obtain all the data, the follow-up by the listener event updates, service management center will handle all subscribe to the subscription service layer, the service is set to the special value, in addition, in addition to subscribe to the current node, service management center will also subscribe to all child nodes of the node.
@override public void doSubscribe(final URL URL, Final NotifyListener Listener) {try {// Subscribe all data if (any_value.equals (url.getServiceInterface())) {String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); / / for the first time will create a listener listener ChildListener zkListener = listeners.com puteIfAbsent (listener, k - > (parentPath, For (String child: currentChilds) {child = url.decode (child); if (! Contains (child)) {// If not already subscribed, subscribe anyservices.add (child); subscribe(url.setPath(child) .addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), k); }}}); // Create a persistent node and subscribe to the child node zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (CollectionUtils.isNotEmpty(services)) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service) .addParameters(INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { List<URL> urls = new ArrayList<>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children ! = null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } // Call NotifyListener to update the local cache notify(URL, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); }}Copy the code
2.2.4 Cache mechanism
Caching is space for time, and if every remote call is preceded by a list of callable services from the registry, it can overwhelm the registry with traffic. Each additional network request can degrade the entire service network performance. Therefore, AbstractRegistry encapsulates the local disk and memory cache. The memory cache is stored in the Properties object, and a persistent File on the disk is referenced through the File object. The File File is stored in the /. Dubbo directory under the user home directory and is named after the service interface – Service instance IP: port.
The file content is as follows
Com. Seewo. Demo. Service. Interfaces. DemoService = empty \ : / / 192.168.0.109 / com seewo. Demo. Service. Interfaces. DemoService? Application \ = dubbo - demo - the annotation - consumer&category \ = routers&check \ = false & dubbo \ = 2.0.2 & init \ = false &interface\=com.seewo.demo.service.interfaces.DemoService &methods\=sayHello,sayHelloAsync&pid\=88004 & release \ = 2.7.7 & revision 1.0 SNAPSHOT & side \ \ = = consumer & sticky \ = false×tamp \ = 1613461973362...Copy the code
At service initialization, the AbstractRegistry constructor reads the persistent registration data from the local disk file into the Properties object and loads it into the in-memory cache.
public abstract class AbstractRegistry implements Registry { private final Properties properties = new Properties(); Private final ConcurrentMap<URL, Set<NotifyListener>> Subscribed = new ConcurrentHashMap<>(); // Service cache object in memory private Final ConcurrentMap<URL, Map<String, List<URL>>> NOTIFIED = new ConcurrentHashMap<>(); Private File File; public AbstractRegistry(URL url) { setUrl(url); if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, SyncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false); String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache"; String filename = url.getParameter(FILE_KEY, defaultFilename); File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (! file.exists() && file.getParentFile() ! = null && ! file.getParentFile().exists()) { if (! file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!" ); } } } this.file = file; loadProperties(); notify(url.getBackupUrls()); } private void loadProperties() { if (file ! = null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); properties.load(in); // Read files on disk... } catch (Throwable e) { ... } finally { ... }}}}Copy the code
The cache can be saved synchronously and asynchronously. The asynchronous cache can be saved asynchronously using the thread pool. If an exception occurs during the execution of the thread, the thread pool will be called again and retry continuously.
private void saveProperties(URL url) { ... if (syncSaveFile) { doSaveProperties(version); } else { registryCacheExecutor.execute(new SaveProperties(version)); }}.. }Copy the code
3. Realization principle of service consumption
The Dubbo framework also does service consumption in two parts. The first step is to generate an Invoker by holding a remote service instance. The Invoker is the core remote proxy object on the client side. The second step transforms the Invoker through a dynamic proxy into a dynamic proxy reference that implements the user interface. Here Invoker hosts network connection communication, service invocation, and retry.
The entry point for the Dubbo framework to actually do service references is at ReferenceBean#getObject, inherited from the ReferenceConfig class. After ReferenceAnnotationBeanProcessor annotators after processing, using the @ Reference annotation identifies the object, the Spring container will set the injection ReferenceBean object.
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean { ... @Override public void afterPropertiesSet() throws Exception { .... If (shouldInit()) {// call the referenceconfiggetgetobject (); }}... }Copy the code
The ReferenceBean object is generated and initialized in the ReferenceConfig. Dubbo supports simultaneous consumption of multiple registries. If a service is configured to register multiple registries at the same time, it will be merged into an Invoker in ReferenceConfig#createProxy. In createProxy method, the remote proxy object is created and converted. The client starts pulling service metadata, subscribing to the provider, routing, and configuration changes. Operations such as data pull, subscription, and service Invoker conversion are triggered mainly through RegistryProtocol#refer.
public class ReferenceConfig<T> extends ReferenceConfigBase<T> { .... Public synchronized void init() {ref = createProxy(map); . } private T createProxy(Map<String, String> map) {// Single registry consumes if (urls.size() == 1) {invoker invoker = REF_PROTOCOL. urls.get(0)); } else { List<Invoker<? >> invokers = new ArrayList<Invoker<? > > (); URL registryURL = null; Invokers. add(ref_protocol. refer(interfaceClass, URL)); if (UrlUtils.isRegistry(url)) { registryURL = url; } } if (registryURL ! = null) {/ / by Cluster transform multiple Invoker into a Invoker URL u = registryURL. AddParameterIfAbsent (CLUSTER_KEY, ZoneAwareCluster.NAME); invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { invoker = CLUSTER.join(new StaticDirectory(invokers)); }}... Return (T) proxy_factory.getProxy (invoker, protocolutils.isgeneric (generic)); return (T) proxy_factory.getProxy (invoker, protocolUtils.isgeneric); } } public class JdkProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<? >[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread(). getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); } } public class InvokerInvocationHandler implements InvocationHandler { ..... @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ..... return invoker.invoke(rpcInvocation).recreate(); }}Copy the code
The createProxy method calls the RegistryProtocol#refer method, which triggers data pull, subscription, and service Invoker conversion. The core data structure is RegistryDirector, which holds the mapping between Invoker and Url.
@override public <T> Invoker<T> refer(Class<T> type, URL URL) throws RpcException { For example, zooKeeper URL = getRegistryUrl(URL); / / create a specific Registry instance Registry Registry. = registryFactory getRegistry (url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); }... Invoker return doRefer(Cluster, Registry, type, URL); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, // Hold Invoker and receive subscription notification RegistryDirectory<T> directory = new RegistryDirectory<T>(type, URL); directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(subscribeUrl); / / registered consumer information to consumer center registry. The register (directory. GetRegisteredConsumerUrl ()); } directory.buildRouterChain(subscribeUrl); // Subscribe service provider, routing, and dynamic configuration directory.subscribe(toSubscribeUrl); Invoker<T> invoker = cluster.join(directory); List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl); OnRefer (this, registryInvokerWrapper); } return registryInvokerWrapper; }Copy the code
The above logic completes the creation of the registry instance, registers metadata to the registry, subscribes to the registry, transforms the protocol (such as the ZooKeeper protocol) according to the registry specified by the user, and stores the corresponding value using Registry at startup. After the central instance is created, the URL here is actually the registry address, and the real consumer is stored in the refer property. The nofity method is triggered in the doSubscribe method.
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set<String> keys = new HashSet<>(); String queryProtocols = this.queryMap.get(PROTOCOL_KEY); for (URL providerUrl : urls) { ... // Merge provider configuration data, such as server IP address and port URL = mergeUrl(providerUrl); String key = url.toFullString(); If (keys.contains(key)) {// If (keys.contains(key)) { } keys.add(key); Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { try { boolean enabled = true; if (url.hasParameter(DISABLED_KEY)) { enabled = ! url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } if (enabled) {invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { } if (invoker ! = null) { // Put new invoker in cache newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }Copy the code
Invoker creation is implemented in DubboProtocol#refer, when the first consumer initiates a subscription, the first pull is performed, and the RegistryDirectory#nofity method is triggered, where the notification data is the full amount of data for a particular category. For example, providers and Routers. When providers data is notified, the Invoker conversion is done in the RegistryDirector #toInvoker method.
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url,
getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
Copy the code
The Dubbo protocol initializes the client connection object before returning the DubboInvoker object. Dubbo supports that whether the client immediately establishes a TCP connection with the remote service depends on whether the lazy parameter is configured. By default, all connections are made. DubboProtocol#refer Is internally called DubboProtocol#initClient is responsible for setting up the client and initializing the Handler.
private ExchangeClient initClient(URL url) { ..... ExchangeClient client; Try {// If lazzy is set, The real call to create a Tcp connection if (url. The getParameter (LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient (url, requestHandler); } else { client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }}Copy the code
4. To summarize
This chapter first discusses the core parsing process of Dubbo framework and Spring framework combined with annotations, and explains the production and service of Dubbo service, service registration discovery/subscription/local cache and other mechanism processes.