1. Introduction

After analyzing the Dubbo service exposure and reference process and then looking at the details of the Design of the Dubbo network communication protocol, it’s time to finally get to the most exciting part of looking at how Dubbo implements the service invocation. Due to the length, this article will be divided into two parts. This part will analyze the service invocation process and response result processing from the perspective of Consumer, and the next part will analyze how Dubbo handles RPC request and response result from the perspective of Provider.Here’s a picture from the official document, so you can get an idea of the process. The proxy object sends the network request through the client, and the request/response object will be sent to the peer end through Codec encoding and decoding and serialization. After receiving the data, the peer end will decode and deserialize it, and send the request to a specific thread for processing through the Dispatcher, and finally respond to the result.

2. Source code analysis

Start the analysis with a simple Demo like this:

public interface HelloService {
	R<String> say(String name);
}
public class HelloServiceImpl implements HelloService {
	@Override
	public R<String> say(String name) {
		return R.ok("hello "+ name); }}Copy the code

Consumer cannot be instantiated if there is only an interface. The interface object is a proxy object generated by Dubbo. What are the implementation details of this proxy object? Can you take a peek at the source code of the implementation class? Arthas decompilates the source code for the Arthas proxy class as follows:

public class proxy0 implements ClassGenerator.DC.Destroyable.EchoService.HelloService {
	public static Method[] methods;
	private InvocationHandler handler;

	public R say(String string) {
		Object[] objectArray = new Object[]{string};
		Object object = this.handler.invoke(this, methods[0], objectArray);
		return (R)object;
	}

	public void $destroy() {
		Object[] objectArray = new Object[]{};
		Object object = this.handler.invoke(this, methods[1], objectArray);
	}

	public Object $echo(Object object) {
		Object[] objectArray = new Object[]{object};
		Object object2 = this.handler.invoke(this, methods[2], objectArray);
		return object2;
	}

	public proxy0(a) {}public proxy0(InvocationHandler invocationHandler) {
		this.handler = invocationHandler; }}Copy the code

The Dubbo generated proxy classes automatically help us implement many interfaces: the DC interface, which is a token interface, just because it is a dynamically generated class. The Destroyable interface is also implemented, meaning that it is Destroyable and that its $destroy() method calls the destroy() method that corresponds to Invoker. The EchoService interface is used for echo testing to test the availability of the service.

Call the say() method and it will pass it to the InvocationHandler. The InvocationHandler object is specified when the proxy object is generated, with the following code:

public class JavassistProxyFactory extends AbstractProxyFactory {
    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker
       
         invoker, Class
        [] interfaces)
        {
        return (T) Proxy.getProxy(interfaces).newInstance(newInvokerInvocationHandler(invoker)); }}Copy the code

In summary, when we invoke the interface’s custom method, the proxy object fires the InvokerInvocationHandler# Invoke () method, which provides an entry point into the Dubbo service invocation mystery.

2.1 Sending a Request

RPC calls are ultimately about sending network requests, but before that Dubbo needs to do a lot of pre-processing, such as service degradation, interceptors, filters, cluster fault tolerance, asynchronous to synchronous, and so on. In terms of design, Dubbo follows a single responsibility. All these functions are realized by Invoker classes. The decorator mode is adopted to package the most basic DubboInvoker layer by layer and finally realize this set of complex functions.

2.1.1 InvokerInvocationHandler

In the InvokerInvocationHandler#invoke() method, the logic is simple: if the invoke method comes from an Object, the Invoker itself is called without triggering an RPC call. If it is a custom method, you need to create the RpcInvocation object and hand it over to Invoker.

RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);

if(consumerModel ! =null) {
    rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
    rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
return invoker.invoke(rpcInvocation).recreate();
Copy the code

2.1.2 MockClusterInvoker

This class is responsible for service degradation and data mocks. When the interface call fails, Dubbo tries Mock data and returns it, for non-critical processes. Note that Dubbo does not Mock business exceptions, only non-business exceptions such as timeouts. The implementation is to try catch the invoke logic and, if a non-business exception is caught, Mock logic is executed and Mock data is returned.

2.1.3 InterceptorInvokerNode

This class is responsible for implementing the Cluster layer interceptor and intercepting invoke calls by implementing the ClusterInterceptor interface. Dubbo currently offers two implementation class: ConsumerContextClusterInterceptor and ZoneAwareClusterInterceptor. The former is used to set and clean up the RpcContext, while the latter is supposed to give Invoker the ability to be region-aware and preferentially invoke services from the same machine room.

Result asyncResult;
try {
    interceptor.before(next, invocation);
    asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
    if (interceptor instanceof ClusterInterceptor.Listener) {
        ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
        listener.onError(e, clusterInvoker, invocation);
    }
    throw e;
} finally {
    interceptor.after(next, invocation);
}
Copy the code

2.1.4 AbstractClusterInvoker

The responsibility of this class is to implement the basic logic of ClusterInvoker, using the template method pattern to implement a set of algorithm skeleton, and subclasses only need to implement their own unique logic. The AbstractClusterInvoker#invoke() method first writes the attachments of the RpcContext to the RpcInvocation, and then filters the list of services available by Directory to generate a set of invokers. After the initial test of LoadBalance, the subsequent load balancing will select a final Invoker from this set of invokers to call.

public Result invoke(final Invocation invocation) throws RpcException {
    // Make sure the service is not logged out
    checkWhetherDestroyed();
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if(contextAttachments ! =null&& contextAttachments.size() ! =0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }
    // Filter the service list by Directory
    List<Invoker<T>> invokers = list(invocation);
    // Initialize load balancing
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
Copy the code

2.1.5 FailoverClusterInvoker

Cluster is a fault-tolerant interface for clusters. The default fault-tolerant solution for clusters is FailoverCluster, which is only used as an example. The duty of this class is to implement the logic of retries for service invocation failures. The doInvoke() method first gets the number of retries, then uses Loadbalance for load balancing, selects a final Invoker and calls it, and if a non-business exception is caught, retries with another Invoker.

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // Number of retries
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    RpcException le = null;
    // Record Invoker that has already been called, retry to avoid
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    // Record the Provider that has been called
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        // Load balancing
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // Service invocation
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally{ providers.add(invoker.getUrl().getAddress()); }}}Copy the code

2.1.6 ProtocolFilterWrapper

This class, whose job is to execute the filter chain, is a wrapper class, and the Protocol object is wrapped automatically by the Dubbo SPI mechanism. The buildInvokerChain() method is used to build the FilterChain, which uses the SPI to load active filters and orchestrate them into a one-way list. At the end of the list, the Invoker ensures that all filters are passed before the invoke method is executed.

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        asyncResult = filter.invoke(next, invocation);
    } catch (Exception e) {
        throwe; }}// The code is streamlined...
Copy the code

2.1.7 AsyncToSyncInvoker

After Dubbo sends a network request, the client does not know when the server responds to the result, so the invoke call is asynchronous and returns AsyncRpcResult. When the server returns data, Dubbo writes the result to AsyncRpcResult.

This is the underlying implementation details of Dubbo, but as a developer, RPC calls should be the same as local methods, and the program can only proceed after receiving the response result, hence AsyncToSyncInvoker. The logic is simple: call AsyncRpcResult#get() to block the current thread until the server responds.

@Override
public Result invoke(Invocation invocation) throws RpcException {
    // call to get asynchronous results
    Result asyncResult = invoker.invoke(invocation);
    if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
        // call synchronously, block and wait for results
        asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
    }
    return asyncResult;
}
Copy the code

2.1.8 DubboInvoker

This class is responsible for implementing remote calls to the Dubbo protocol. It is the underlying Invoker and no more invokers are wrapped. The logic is also simple, sending the RpcInvocation as a parameter to the Provider using Exchange Server, getting the CompletableFuture result, wrapping it as AsyncRpcResult and returning it.

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    // Poll the client
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // Whether the data is sent unidirectionally and the peer end is not expected to respond
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // Calculate the timeout
        int timeout = calculateTimeout(invocation, methodName);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            // Call back the thread pool
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            // Send the request
            CompletableFuture<AppResponse> appResponseFuture =
                currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            returnresult; }}catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

2.1.9 HeaderExchangeChannel

This class is responsible for sending network requests. All Dubbo network requests are eventually encapsulated as a Request object, which records information such as the RequestID, protocol version, and Request body. In Netty’s case, channel.writeAndFlush() is called to send data to the peer end.

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
Copy the code

At this point, the service Request ends, and then the network transport layer logic encodes and serializes the Request object, which I won’t go into here.

2.2 Processing Response

After the request is sent, the client thread blocks until the server responds with the result. How does the client process the response result? How do you match the response to each request?

2.2.1 DefaultFuture

In the Request constructor, RequestID is automatically generated. This ID is globally incredulated. Using the AtomicLong class, this ensures that all requests made by the same client, RequestID is unique. When the server responds, it writes the RequestID back as is, and the client knows which request it is based on the RequestID of the server response.

In addition, every time the client sends a request, it creates a DefaultFuture object, which inherits from The CompletableFuture, and the thread calling the get() method blocks if there is no result. DefaultFuture, in its constructor, puts itself into a global Map, and the client blocks waiting for the result. After the server responds to the data, the client retrieves the corresponding DefaultFuture according to the RequestID and writes the results. The client thread stops blocking and the program runs normally.

Of course, what the client receives is still a sequence of bytes, which needs to be decoded into a Response object. The data in Response is the return value of the method, and the client needs to deserialize it to get the final result, which is not detailed here.

3. Summary

Dubbo automatically generates the Proxy object for the interface. When the custom method is fired, the Proxy is turned over to the InvokerInvocationHandler, which creates the RpcInvocation object and passes it on to subsequent invokers. These invokers include: Service degradation, interceptor, cluster fault tolerance, Filter, asynchronous to synchronous, and so on. Finally, Invoker corresponding to specific protocol, such as DubboInvoker, is constructed to send network requests.

In the Dubbo design architecture, most of the logic is implemented on the client side, so the Consumer service invocation is more complex than the Provider handling the request. Dubbo follows a single principle, as you’ll see there are a variety of Invoker classes, but each Invoker class has a very simple and clear job of wrapping Invoker layer by layer in a decorator pattern, resulting in a complex set of functions.