Welcome to the public number [sharedCode] committed to mainstream middleware source analysis, you can contact me directly
The blogger’s personal website: www.shared-code.com
preface
Dubbo’s service exposure process is introduced above. After service exposure, the next step is how consumers reference services. This paper is mainly to interpret the principle of Dubbo consumer reference services
RefenenceBean
In a consumer project, referencing an external service generates a ReferenceBean instance. Since ReferenceBean implements FactoryBean, look directly at the getObject method,
See how this Bean is generated
@Override
public Object getObject(a) throws Exception {
return get();
}
Copy the code
The get method of the parent ReferenceConfig class is called
//ReferenceConfig.java
public synchronized T get(a) {
if (destroyed) {
// Destroyed == true means the class is ready to be destroyed.
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
// If the reference is empty, it needs to be initialized by calling the init method.
init();
}
return ref;
}
Copy the code
As you can see from the above, most of the logic is in init, which also illustrates the problem that the consumer references the service when the ReferenceBean is created.
ReferenceConfig.init()
private void init(a) {
if (initialized) {
return;
}
// ...
// Omit 200 lines of code
/ /...
// Focus on this method, create a proxy class, namely our service reference class
ref = createProxy(map);
// Create the ConsumerModel and place it in the global MAP.
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
Copy the code
ReferenceConfig.createProxy()
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp"."localhost".0, map);
final boolean isJvmRefer; // Whether JVM level calls are required
if (isInjvm() == null) {
if(url ! =null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
} else {
isJvmRefer = false; }}else {
isJvmRefer = isInjvm().booleanValue();
}
// As we mentioned earlier, dubbo is both local and remote exposure. If the service consumer and service provider are running in the same JVM, for example, in the same Tomcat,
// Dubbo will prefer native exposure, since this is direct JVM level interaction, which is much faster
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service "+ interfaceClass.getName()); }}else {
if(url ! =null && url.length() > 0) { // Whether the URL is specified directly, if the registry address is specified directly.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if(us ! =null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else{ urls.add(ClusterUtils.mergeUrl(url, map)); }}}}else {
// Call the service provider from the registry.
// Get the registry address
List<URL> us = loadRegistries(false);
if(us ! =null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if(monitorUrl ! =null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// Build the URL of the registry, such as registry:// XXXXX.urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); }}if (urls == null || urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config
to your spring config."); }}if (urls.size() == 1) {
// In general, this is the address of a registry. Go here and call the refer method
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// Multiple registriesList<Invoker<? >> invokers =newArrayList<Invoker<? > > (); URL registryURL =null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url}}if(registryURL ! =null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(newStaticDirectory(invokers)); }}}// Check whether to start
Boolean c = check;
if (c == null&& consumer ! =null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // If it is not set, check at startup will be started
}
if(c && ! invoker.isAvailable()) {// If the service provider is unavailable, an error is reported
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// Get the proxy class referenced by the service,
return (T) proxyFactory.getProxy(invoker);
}
Copy the code
Description:
The core code above is mainly in the following line of code, connect to the registry, cluster fault tolerant mode, connect to the Dubbo service provider, establish a long connection, generate service reference objects.
invoker = refprotocol.refer(interfaceClass, urls.get(0));
Copy the code
Refprotocol is defined as follows
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
Copy the code
If you have looked at the SPI extension mechanism, you will know that the above code returns a Protocol$Adaptive implementation class. When executing the method of this class, it depends on the Protocol of the parameter URL, although the default Protocol is dubbo. However, if the URL contains other protocols, the URL will prevail
The following is an example of a URL:
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService? Application = dubbo - consumer&dubbo = 2.6.2 & pid = 10768 & refer = application % 3 ddubbo - consumer % 26 check % 3 dfalse % 26 dubbo % 3 d2. 6.2% in 26 Terface % 3 dcom. The dubbo. Service. User. UserFacadeService % 26 the methods % 3 dgetuser % 26 pid % 3 d10768%26 register. The IP % 3 d192. 168.59.3%26 side %3Dconsumer%26timestamp%3D1540205102302®istry=zookeeper×tamp=1540205144720
Copy the code
Related links:
Dubbo series kernel SPI-Protocol extension Class Generation (9)
Dubbo Series in-depth Understanding service Release (10)
The refer method is called, and its execution chain is as follows. The order of execution is uncertain, but it is eventually executed in the RegistryProtocol class.
ProtocolListenerWrapper
> ProtocolFilterWrapper
> QosProtocolWrapper
> RegistryProtocol
RegistryProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// Set up the ZooKeeper connection address
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
/ / connect the zk
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if(group ! =null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
returndoRefer(getMergeableCluster(), registry, type, url); }}// Create a service reference.
return doRefer(cluster, registry, type, url);
}
Copy the code
The following is an example of a URL:
zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService? Application = dubbo - consumer&dubbo = 2.6.2 & pid = 10768 & refer = application % 3 ddubbo - consumer % 26 check % 3 dfalse % 26 dubbo % 3 d2. 6.2% in 26 Terface % 3 dcom. The dubbo. Service. User. UserFacadeService % 26 the methods % 3 dgetuser % 26 pid % 3 d10768%26 register. The IP % 3 d192. 168.59.3%26 side %3Dconsumer%26timestamp%3D1540205102302×tamp=1540205144720
Copy the code
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// Build the registry directory management class to manage the connection to zK, retry after long connection disconnection
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// Request parameters.
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// The url to subscribe to
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
/ / consumer: / / 192.168.59.3 / com. Dubbo. Service. User. UserFacadeService? application=dubbo-consumer&category=
/ / will, configurators, routers&check = false&dubbo = 2.6.2 & interface = com. The dubbo. Service. User. UserFaca
// deService&methods=getUser&pid=10768&side=consumer×tamp=1540205102302
if(! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY,true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
/ / subscribe
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// Cluster fault tolerance is enhanced. New invokers are generated based on cluster fault tolerance policies, which will be explained in detail later.
Invoker invoker = cluster.join(directory);
// Add Invoker to consumerInvokers by registering consumers.
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
Copy the code
Every consumer to register the node names on the zk is the following, namely subscribeUrl, consumer: / / 192.168.59.3 / com dubbo. Service. User. UserFacadeService
The following lines of code, the main is to when and zk connection broken, due to subscribe to the consumer: / / 192.168.59.3 / com dubbo. Service. User. UserFacadeService this node, so you can listen to the connection state, can retry, It also monitors the connection status with the service provider and reconnects when the long connection with the service provider is disconnected.
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Copy the code
After executing the above code, you can see the following directory on ZK:
Conclusion:
In fact, this paper mainly introduces a process, without much discussion of specific details, the overall process is as follows:
1. Initialize ReferenceBan and execute init
2. Analyze some parameters of consumers
3. Determine whether a local call, that is, a JVM-level call, is needed.
4. Generate the registration address of the registry and execute the refer method of the Protocol extension implementation class
5. In the refer method, the final call to the RegistryProtocol class, and then establish a connection to the ZK, subscription consumer ZK node
6. Generate the RegistryDirectory registration management class, listen for zK node changes, and refresh Invoker
7. Code enhancements to Invoker based on the selected cluster fault tolerance strategy
The consumer reference service feature will be written again. In the following sections, we will focus on the long connection between the consumer and the service provider, the connection to the ZK, and asynchronous notification.
Welcome to the public number [sharedCode] committed to mainstream middleware source analysis, you can contact me directly
The blogger’s personal website: www.shared-code.com