Dubbo, as an RPC framework with a very high occurrence rate, is adopted by a large number of companies because of its proprietary protocol, which makes its remote call process more efficient compared with the remote call implemented by Spring Cloud based on HTTP protocol.
Spring container integration
The server starts the Entry ServiceBean
Let’s first look at the inheritance structure of the entry ServiceBean:
The first thing that catches your eye is the stack of interfaces he implements:
implements InitializingBean,
DisposableBean,
ApplicationContextAware,
ApplicationListener,
BeanNameAware
Copy the code
This should immediately reflect on the Life cycle of Spring beans. Let’s just look at the relevant entry point here:
- BeanNameAware
Is in the Bean initialization phase, when the Bean has instantiated and completed property assignment.
- ApplicationContextAware
Also in the Bean initialization phase, in order to save a context reference to Spring alone.
- InitializingBean
It is executed during the Bean initialization phase, after xxAware, and after the Bean initializes the preprocessor.
- DisposableBean
In the Bean destruction phase, some cleanup of the Bean is performed.
Here I just need to focus on the ApplicationContextAware#setApplicationContext and InitializingBean#afterPropertiesSet methods.
Spring context is woven into the point setApplicationContext
- Save the Spring context reference in
ServiceBean
We’ll use that later. - Associate the Spring context with the Dubbo own container.
SpringExtensionFactory.addApplicationContext(applicationContext);
Copy the code
- Set yourself as a Spring container event listener through reflection.
Method method = applicationContext.getClass().getMethod("addApplicationListener".newClass<? >[]{ApplicationListener.class});/ / compatible Spring2.0.1
method.invoke(applicationContext, new Object[] {this});
Copy the code
True service exposure is also triggered by listening to the ContextRefreshedEvent event.
Configure parsing afterPropertiesSet
Configuration resolution and initialization related to service exposure are concentrated in the afterPropertiesSet method. Let’s take a look at the following steps:
Provider Configuration resolution
Service provider Default value. The corresponding configuration class: org. Apache. Dubbo. Config. ProviderConfig. The default value of the dubbo:service and dubbo:protocol labels is set.
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ?
null :
BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
ProviderConfig.class, false.false); .Copy the code
Application Configuration Resolution
Application information configuration. The corresponding configuration class: org. Apache. Dubbo. Config. ApplicationConfig
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ?
null :
BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
ApplicationConfig.class, false.false); .Copy the code
Module configuration resolution
Module information configuration. The corresponding configuration class org. Apache. Dubbo. Config. ModuleConfig
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ?
null :
BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
ModuleConfig.class, false.false); .Copy the code
Monitor Configuration Resolution
Monitoring center configuration. The corresponding configuration class: org. Apache. Dubbo. Config. MonitorConfig
Protocol Configuration resolution
Service provider protocol configuration. The corresponding configuration class: org. Apache. Dubbo. Config. ProtocolConfig. If you want to support multiple protocols, you can declare multiple dubbo:protocol tags and specify the protocol to be used in the dubbo:service through the protocol attribute.
Set service path
Set the service name to serviceConfig
if (getPath() == null || getPath().length() == 0) {
if(beanName ! =null && beanName.length() > 0&& getInterface() ! =null && getInterface().length() > 0&& beanName.startsWith(getInterface())) { setPath(beanName); }}Copy the code
Whether to delay exposing services
If the corresponding service is a configuration delayed exposure, the service exposure method #export is invoked at this stage.
if (! isDelay()) {
export();
}
Copy the code
Triggers the service to expose the onApplicationEvent
During the initialization phase of the Spring Bean, you have already set yourself up as a listener for Spring Events through reflection. Service exposure is triggered when the ContextRefreshedEvent event is received.
public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if(isDelay() && ! isExported() && ! isUnexported()) { ... export(); }}}Copy the code
Configure the obtain and check phase
Check items include but are not limited to:
- Specifies whether the method exists on the interface
- Whether the configuration
application:name
- Whether to configure the registry address
registry:address
- Set the service exposure protocol. The default is
dubbo
- Check the pile method and mock method – this is mainly an interface implementation check
Splicing exposes the service URL
Based on the server configuration, the service information to be exposed is spliced into the form of urls. Common keys are:
side=provider&version=? ×tamp=? &pid=? &methods=? &name=? &port=? .Copy the code
In addition, if monitor is configured, parameters related to monitor are added to the URL.
URL monitorUrl = loadMonitor(registryURL);
Copy the code
Create/get invoker
Invoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));Copy the code
Dubbo has three implementations of Invoker: Javassist, StubProxy, and the original JdkProxy.
The Invoker is essentially a layer of encapsulation of the actual calling method. Wrap all forms of method calls uniformly in Invoker.
JdkProxyFactory
---
public <T> T getProxy(Invoker
invoker, Class
[] interfaces)
{
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName, Class
[] parameterTypes, Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
returnmethod.invoke(proxy, arguments); }}; }Copy the code
Dubbo related
Services available
Protocol layer – Protocol
Dubbo actually supports multiple protocols, but let’s just look at the implementation of the Dubbo protocol here.
String key = url.getAddress(); . ExchangeServer server = serverMap.get(key);if (server == null) {
serverMap.put(key, createServer(url));
} else {
// Server supports reset, which can be used with override
server.reset(url);
}
Copy the code
As you can see from the above code, the createServer will only be executed once in the same address. That is, all services within the same service expose the remote service over the same TCP connection.
Before actually starting the service, add a series of service parameters:
-
channel.readonly.sent
By default, sending the Readonly event when the server is disabled is enabled
-
heartbeat
The default interval is 60 x 1000
-
codec
Service coding mode. The default value is Dubbo
Information exchange layer – Non-recoverable
Call server = exchangers.bind (URL, requestHandler); Really start the service.
During the actual operation, different transfer classes are selected according to the CONFIGURATION in the URL.
The implementation class provided by dubbo is HeaderExchangeServer.
ScheduledExecutorService is used to implement a timed heartbeat mechanism.
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels(a) {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels() );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat,TimeUnit.MILLISECONDS);
Copy the code
Finally, the transporters.bind is called to enter the actual transport layer.
return new HeaderExchangeServer(Transporters.bind(url,
new DecodeHandler(new HeaderExchangeHandler(handler))));
Copy the code
Network transport layer – Transporter
Let’s take the NettyTransporter as an example.
Take a look at the code for starting a service using Netty:
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss".true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker".true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline(a) {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); } * /
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
returnpipeline; }});// bind
channel = bootstrap.bind(getBindAddress());
Copy the code
conclusion
Please briefly describe dubbo’s service exposure process
First, the Dubbo server starts with ServiceBean. To integrate with Spring, he makes two cuts to the Bean initialization phase in Spring:
- ApplicationContextAware gets the Spring context ahead of time and sets itself as the listener of the Spring Event.
- InitializingBean does the dubbo configuration parsing and wrapping of Invoker objects. If there are errors in the configuration file, they are usually exposed here.
Then, because the listener is set up, the Dubbo service exposure is officially triggered when Spring issues a ContextRefreshedEvent.
The main logic of service exposure is implemented at dubbo’s protocol layer, information exchange layer and network transport layer.
First, service-related parameters, such as heartbeat, encoding, etc. are prepared by the protocol layer. It then goes to the information exchange layer to encapsulate the heartbeat mechanism, and finally to the network transport layer to enable NettyServer.
Set requestHandler as pipeLine in NettyServer. The link is:
DecodeHandler -> HeaderExchangeHandler -> DubboProtocol$ExchangeHandlerAdapter
Copy the code
-
DecodeHandler Decodes raw InputStream.
-
HeaderExchangeHandler routes messages based on message types, such as request/ Response /echo
-
DubboProtocol$ExchangeHandlerAdapter Translates the message (Invocation) to the Invoker Invocation and executes.
extension
When nettyService receives the data, how does it convert it to a method call
Let’s take a look at the ChannelPipeline that dubbo created for us.
ChannelPipeline pipeline = Channels.pipeline(); . pipeline.addLast("handler", nettyHandler);
Copy the code
The key is in nettyHandler.
Recall the code that calls the network transport interface for binding at the information exchange layer:
new HeaderExchangeServer(Transporters.bind(url,
new DecodeHandler(
new HeaderExchangeHandler(handler))));
Copy the code
DecodeHandler -> HeaderExchangeHandler -> DubboProtocol$ExchangeHandlerAdapter
The first step is to deserialize by DecodeHandler:
if (message instanceof Request) {
decode(((Request)message).getData());
}
if (message instanceof Response) {
decode( ((Response)message).getResult());
}
handler.received(channel, message);
Copy the code
Then, hand over to the HeaderExchangeHandler to do the request/response routing of the method.
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else{ handler.received(exchangeChannel, request.getData()); }}}else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if(echo ! =null && echo.length() > 0) { channel.send(echo); }}}else {
handler.received(exchangeChannel, message);
}
Copy the code
Finally, it goes into an ExchangeHandlerAdapter inner class implemented in DubboProtocol:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceofInvocation) { ... Invocation inv = (Invocation) message; Invoker<? > invoker = getInvoker(channel, inv); .return invoker.invoke(inv);
}
Copy the code
As we’ve seen above, invoker is dubbo’s encapsulation of the actual method call.
So in this case, it’s the process of getting the invoker based on serviceKey and executing the call.
String serviceKey = serviceKey(
port,
path,
inv.getAttachments().get(Constants.VERSION_KEY),
inv.getAttachments().get(Constants.GROUP_KEY));
Copy the code
Exchange – Synchronous/asynchronous conversion in the information exchange layer
When we make a method call using the RPC framework, only a synchronous method call occurs at the caller level, just like a local method call.
However, requests do get sent asynchronously in the RPC framework. When the remote method returns the value asynchronously, it is returned synchronously to the user.
The code is located in Dubboinvoke #doInvoke.
protected Result doInvoke(final Invocation invocation) throws Throwable {...if (isOneway) {
...
} else if (isAsync) {
...
} else {
RpcContext.getContext().setFuture(null);
return(Result) currentClient.request(inv, timeout).get(); }...Copy the code
In currentClient. Request, we send this call to the data transport layer asynchronously and then block the caller thread with a call to ResponseFuture#get.
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break; }}Copy the code
Later, upon receiving data from the server, the caller thread is awakened by an active call to done.signal() to complete the asynchronous to synchronous transition.
private void doReceived(Response res) {... response = res;if(done ! =null) { done.signal(); }... }Copy the code