References:
- Obing-dubbo service invocation procedure
conclusion
The Dubbo service invocation is divided into two main processes: the consumer sends the request and receives the response result, and the provider receives the request.
Consumer side:
- Send request: the proxy object of the service interface executes the target method, which is
InvokerInvocationHandler#invoke
Method interception, after route filtering, load balancing, select a DubboInvoker object, call doInvoke method. Create a Request object and generate a globally unique Request ID, then instantiate a DefaultFuture object,Use the request ID as the key, save DefaultFuture to a ConcurrentHashMap. Finally, the RpcInvocation that encapsulates the target method information is serialized through the NettyClient- Receive the response: find the corresponding Future in the cache based on the response ID, that is, the request ID, and execute the doReceived method. Save the result and wake up the corresponding requesting thread to process the response result
Provider side:
After receiving the request, NettyServer obtains the information according to the protocol and deserializes it into objects, which are distributed to the thread pool for processing. The message is sealed to a ChannelEventRunnable object of type RECEIVED. The worker thread will eventually call the DubboProtocol#reply method, build a serviceKey based on port, path, Version, and Group, find the corresponding Exporter from the cache, and run through multiple calls until it finds the actual implementation class and executes the target method to return the result.
Call flow chart
The client makes the local call, which is actually the proxy class, which makes the request through the remote client (NettyClient by default). The protocol header is built, specifying the communication protocol, serializer type, and body length, then the Java object is serialized into the protocol body, and the data is sent.
The server (NettyServer) receives the request and distributes it to the business thread pool for processing. The business thread finds the corresponding implementation class, executes the corresponding method and returns the result.
The application layer communication protocol and serializer are omitted here. See Resources for details.
Client source code analysis
As mentioned in the previous article, when a consumer references a service, ReferenceConfig#init creates a proxy object through a dynamic proxy, so when we invoke a method on the service interface, it is intercepted by InvokerInvocationHandler#invoke
com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
/ /... Omit some code
// create the RpcInvocation object and then call MockClusterInvoker#invoke
The Invocation class basically assigns the Invocation Invocation to another class when the target method is called
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
Copy the code
Let’s follow the code and see how it works.
com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// Get the mock configuration
String value = directory.getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock
/ / call AbstractClusterInvoker
result = this.invoker.invoke(invocation);
}
/ /... Omit some code
return result;
}
Copy the code
com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
@Override
public Result invoke(final Invocation invocation) throws RpcException {
// Check if it is destroyed
checkWhetherDestroyed();
LoadBalance loadbalance = null;
// binding attachments into invocation.
// See if the context is attachments, and bind it to the Invocation if it is
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if(contextAttachments ! =null&& contextAttachments.size() ! =0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// Call directory#list, which does route filtering
List<Invoker<T>> invokers = list(invocation);
if(invokers ! =null && !invokers.isEmpty()) {
// Get the LoadBalance implementation class via SPI
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(
invokers.get(0).getUrl().getMethodParameter(
RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// finally call the DubboInvoker#doInvoke method to select an Invoker using a load balancing strategy
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
Copy the code
DubboInvoker#doInvoke
We ignore the routing, the cluster, load balancing of related knowledge, focus on com. Alibaba. The dubbo.. RPC protocol. The dubbo. DubboInvoker# doInvoke
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// Set path and version to attachment
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// Select a client
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// Call the flag asynchronously
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// oneway send mode flag
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// The timeout period
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// Oneway mode, regardless of the result
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// Send it asynchronously
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
// send synchronously, calling future.get() directly to block and wait for the result
// The difference between asynchronous and synchronous is whether future.get() is a user call or a component call
return(Result) currentClient.request(inv, timeout).get(); }}/ /... Omit some code
}
Copy the code
Three ways to call
As you can see from the above code, there are three types of invocation, oneway, asynchronous and synchronous.
- **oneway: ** When you do not need to care about sending results, you can choose this method, which consumes the least and does not care about anything.
- ** Asynchronous call: ** The client sends the request and returns a ResponseFuture, which is then plugged into the context. Call to retrieve the Future from the context when the user needs the result
future#get
Get the request result - ** Synchronous calls: ** This is the one we use most often, but the component does it for us manually
future#get
Method to make it feel like a synchronous call to the user
Send the request
Currentclient. request(inv, timeout) is executed by creating a request object request, setting the version number and request content, constructing and caching the Future object, and calling NettyClient to send the request.
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
/ /... Omit some code
// create request.
// Create a request object and generate a unique request ID
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// Create and cache the Future
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// Finally call NettyChannel#send
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
Copy the code
Build Request
com.alibaba.dubbo.remoting.exchange.Request
public class Request {
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private final long mId;
public Request(a) {
mId = newId();
}
private static long newId(a) {
// getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
return INVOKE_ID.getAndIncrement();
}
public long getId(a) {
return mId;
}
/ /... Omit some code
}
Copy the code
To build the Future
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#DefaultFuture
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
// Generate the request ID as the key to retrieve the corresponding Future and Channel from the Map when the response result is received
this.id = request.getId();
this.timeout = timeout > 0 ?
timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
// FUTURES and CHANNELS are both ConcurrentHashMap
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
Copy the code
As you can see, AtomicLong#getAndIncrement is called inside the Request to generate a globally unique ID when we create the Request object. When creating the DefaultFuture object, the request ID is used as the key and the Future is saved to the Map. When the response result is received, the final call is DefaultFuture# Received to receive the result
Receiving response results
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received
public static void received(Channel channel, Response response) {
try {
// The id returned by the response is the id sent by the server when the request is made
// Find the corresponding Future based on the request ID into the cache
DefaultFuture future = FUTURES.remove(response.getId());
if(future ! =null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ "- >"+ channel.getRemoteAddress())); }}finally{ CHANNELS.remove(response.getId()); }}private void doReceived(Response res) {
lock.lock();
try {
// Receive the response result
response = res;
if(done ! =null) {
// Wake up the request threaddone.signal(); }}finally {
lock.unlock();
}
if(callback ! =null) { invokeCallback(callback); }}Copy the code
** According to the response result ID, which is the request ID, the client finds the corresponding Future object in the cache, calls the doReceived method, saves the response result, and wakes up the corresponding request thread. ** process is shown in the figure below:
summary
Main process:
1. The proxy object of the service interface executes the target method, which is intercepted by the InvokerInvocationHandler#invoke method to enter the proxy process. After route filtering and load balancing, select a DubboInvoker and run the doInvoker method. A Request object is created and a globally unique ID is generated. Next, create DefaultFuture and save it to ConcurrentHashMap with key being the request ID. Finally, the RpcInvocation that encapsulates the target method information is serialized as a message via the NettyClient
2. After receiving the returned response result, call DefaultFuture# Received to find the corresponding DefaultFuture based on the returned request ID. The doReceived method is then called to save the response and wake up the request thread. Finally, the results are parsed and returned
Server source code analysis
When the server receives the request, it parses the message. There are five dispatch strategies for messages:
The default is all, which means that all messages are dispatched to the business thread pool. Let’s take a look at the implementation of AllChannelHandler
com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
@Override
public void received(Channel channel, Object message) throws RemotingException {
// Get the thread pool
// Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
ExecutorService cexecutor = getExecutorService();
try {
// the ChannelEventRunnable#run method contains multiple types of processing
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
/ /... Omit some code
}
Copy the code
ChannelEventRunnable
ChannelEventRunnable implements the Runnable interface. Let’s look at its run method.
com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
@Override
public void run(a) {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is "+ message, e); }}else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is "+ message); }}}Copy the code
In combination with the above code, the server receives the request and parses it into a message. It then retrits the thread pool and encapsulates the message as channelstate.received. When an idle thread processes the request, the ChannelEventRunnable#received method is executed and HeaderExchangeHandler#handleRequest is finally called
HeaderExchangeHandler
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
// Encapsulate a response object, holding the request ID
Response res = new Response(req.getId(), req.getVersion());
/ /... Omit some code
// find handler by message class.
// Get the request message, such as method name, parameter type, parameter value. That is, the RpcInvocation that encapsulates the target method information
Object msg = req.getData();
try {
// handle data.
// Finally call DubboProtocol#reply
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
Copy the code
DubboProtocol#reply
This is a key method, source code below
com.alibaba.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
/ / find the InvokerInvoker<? > invoker = getInvoker(channel, inv);/ /... Omit some code
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// Finally find the real implementation class and call the target method
returninvoker.invoke(inv); } } Invoker<? > getInvoker(Channel channel, Invocation inv)throws RemotingException {
/ /... Omit some code
/ / build key, generate rules: group/serviceName: version: the port
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY),
inv.getAttachments().get(Constants.GROUP_KEY));
// According to key, find the corresponding Exporter from the Map of the previously exposed serviceDubboExporter<? > exporter = (DubboExporter<? >) exporterMap.get(serviceKey);// Return to the Invoker that my Exporter encapsulated
return exporter.getInvoker();
}
Copy the code
The key is serviceKey. When a service is exposed, the generated exporter is saved to exporterMap, and its key is also serviceKey.
summary
The main process: After receiving the request, The NettyServer parses it into information and sends it all to the thread pool, which is sealed into a ChannelEventRunnable object and waits for the thread to execute it. The worker thread calls the DubboProtocol#reply method, builds serviceKey, finds the corresponding Exporter from the cache, executes the Invoke method, and finally finds the true implementation class, executes the target method, and returns the result.