References:

  • Obing-dubbo service invocation procedure

conclusion

The Dubbo service invocation is divided into two main processes: the consumer sends the request and receives the response result, and the provider receives the request.

Consumer side:

  • Send request: the proxy object of the service interface executes the target method, which isInvokerInvocationHandler#invokeMethod interception, after route filtering, load balancing, select a DubboInvoker object, call doInvoke method. Create a Request object and generate a globally unique Request ID, then instantiate a DefaultFuture object,Use the request ID as the key, save DefaultFuture to a ConcurrentHashMap. Finally, the RpcInvocation that encapsulates the target method information is serialized through the NettyClient
  • Receive the response: find the corresponding Future in the cache based on the response ID, that is, the request ID, and execute the doReceived method. Save the result and wake up the corresponding requesting thread to process the response result

Provider side:

After receiving the request, NettyServer obtains the information according to the protocol and deserializes it into objects, which are distributed to the thread pool for processing. The message is sealed to a ChannelEventRunnable object of type RECEIVED. The worker thread will eventually call the DubboProtocol#reply method, build a serviceKey based on port, path, Version, and Group, find the corresponding Exporter from the cache, and run through multiple calls until it finds the actual implementation class and executes the target method to return the result.

Call flow chart

The client makes the local call, which is actually the proxy class, which makes the request through the remote client (NettyClient by default). The protocol header is built, specifying the communication protocol, serializer type, and body length, then the Java object is serialized into the protocol body, and the data is sent.

The server (NettyServer) receives the request and distributes it to the business thread pool for processing. The business thread finds the corresponding implementation class, executes the corresponding method and returns the result.

The application layer communication protocol and serializer are omitted here. See Resources for details.

Client source code analysis

As mentioned in the previous article, when a consumer references a service, ReferenceConfig#init creates a proxy object through a dynamic proxy, so when we invoke a method on the service interface, it is intercepted by InvokerInvocationHandler#invoke

com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler#invoke

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    / /... Omit some code
    
    // create the RpcInvocation object and then call MockClusterInvoker#invoke
    The Invocation class basically assigns the Invocation Invocation to another class when the target method is called
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
Copy the code

Let’s follow the code and see how it works.

com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke

 @Override
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // Get the mock configuration
    String value = directory.getUrl()
        			.getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
    
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //no mock
        / / call AbstractClusterInvoker
        result = this.invoker.invoke(invocation);
    }
    / /... Omit some code
    return result;
}
Copy the code

com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    // Check if it is destroyed
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;

    // binding attachments into invocation.
    // See if the context is attachments, and bind it to the Invocation if it is
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if(contextAttachments ! =null&& contextAttachments.size() ! =0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // Call directory#list, which does route filtering
    List<Invoker<T>> invokers = list(invocation);
    
    if(invokers ! =null && !invokers.isEmpty()) {
        // Get the LoadBalance implementation class via SPI
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(
            invokers.get(0).getUrl().getMethodParameter(
                RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
    // finally call the DubboInvoker#doInvoke method to select an Invoker using a load balancing strategy
    return doInvoke(invocation, invokers, loadbalance);
}


protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
    List<Invoker<T>> invokers = directory.list(invocation);
    return invokers;
}
Copy the code

DubboInvoker#doInvoke

We ignore the routing, the cluster, load balancing of related knowledge, focus on com. Alibaba. The dubbo.. RPC protocol. The dubbo. DubboInvoker# doInvoke

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // Set path and version to attachment
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    // Select a client
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // Call the flag asynchronously
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // oneway send mode flag
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // The timeout period
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

        if (isOneway) {
            // Oneway mode, regardless of the result
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            // Send it asynchronously
            ResponseFuture future = currentClient.request(inv, timeout);
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            RpcContext.getContext().setFuture(null);
            // send synchronously, calling future.get() directly to block and wait for the result
            // The difference between asynchronous and synchronous is whether future.get() is a user call or a component call
            return(Result) currentClient.request(inv, timeout).get(); }}/ /... Omit some code
}
Copy the code

Three ways to call

As you can see from the above code, there are three types of invocation, oneway, asynchronous and synchronous.

  • **oneway: ** When you do not need to care about sending results, you can choose this method, which consumes the least and does not care about anything.
  • ** Asynchronous call: ** The client sends the request and returns a ResponseFuture, which is then plugged into the context. Call to retrieve the Future from the context when the user needs the resultfuture#getGet the request result
  • ** Synchronous calls: ** This is the one we use most often, but the component does it for us manuallyfuture#getMethod to make it feel like a synchronous call to the user

Send the request

Currentclient. request(inv, timeout) is executed by creating a request object request, setting the version number and request content, constructing and caching the Future object, and calling NettyClient to send the request.

com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)

@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
    / /... Omit some code
    
    // create request.
    // Create a request object and generate a unique request ID
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    
    // Create and cache the Future
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        // Finally call NettyChannel#send
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
Copy the code

Build Request

com.alibaba.dubbo.remoting.exchange.Request

public class Request {

    private static final AtomicLong INVOKE_ID = new AtomicLong(0);

    private final long mId;

    public Request(a) {
        mId = newId();
    }

    private static long newId(a) {
        // getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
        return INVOKE_ID.getAndIncrement();
    }
    
    public long getId(a) {
        return mId;
    }

	/ /... Omit some code
}
Copy the code

To build the Future

com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#DefaultFuture

public DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    // Generate the request ID as the key to retrieve the corresponding Future and Channel from the Map when the response result is received
    this.id = request.getId();
    this.timeout = timeout > 0 ?
        			timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // put into waiting map.
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}

// FUTURES and CHANNELS are both ConcurrentHashMap
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

Copy the code

As you can see, AtomicLong#getAndIncrement is called inside the Request to generate a globally unique ID when we create the Request object. When creating the DefaultFuture object, the request ID is used as the key and the Future is saved to the Map. When the response result is received, the final call is DefaultFuture# Received to receive the result

Receiving response results

com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received

public static void received(Channel channel, Response response) {
    try {
        // The id returned by the response is the id sent by the server when the request is made
        // Find the corresponding Future based on the request ID into the cache
        DefaultFuture future = FUTURES.remove(response.getId());
        if(future ! =null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                           + "- >"+ channel.getRemoteAddress())); }}finally{ CHANNELS.remove(response.getId()); }}private void doReceived(Response res) {
    lock.lock();
    try {
        // Receive the response result
        response = res;
        if(done ! =null) {
            // Wake up the request threaddone.signal(); }}finally {
        lock.unlock();
    }
    if(callback ! =null) { invokeCallback(callback); }}Copy the code

** According to the response result ID, which is the request ID, the client finds the corresponding Future object in the cache, calls the doReceived method, saves the response result, and wakes up the corresponding request thread. ** process is shown in the figure below:

summary

Main process:

1. The proxy object of the service interface executes the target method, which is intercepted by the InvokerInvocationHandler#invoke method to enter the proxy process. After route filtering and load balancing, select a DubboInvoker and run the doInvoker method. A Request object is created and a globally unique ID is generated. Next, create DefaultFuture and save it to ConcurrentHashMap with key being the request ID. Finally, the RpcInvocation that encapsulates the target method information is serialized as a message via the NettyClient

2. After receiving the returned response result, call DefaultFuture# Received to find the corresponding DefaultFuture based on the returned request ID. The doReceived method is then called to save the response and wake up the request thread. Finally, the results are parsed and returned

Server source code analysis

When the server receives the request, it parses the message. There are five dispatch strategies for messages:

The default is all, which means that all messages are dispatched to the business thread pool. Let’s take a look at the implementation of AllChannelHandler

com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received

@Override
public void received(Channel channel, Object message) throws RemotingException {
    // Get the thread pool
    // Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    ExecutorService cexecutor = getExecutorService();
    try {
        // the ChannelEventRunnable#run method contains multiple types of processing
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    }
    / /... Omit some code
}
Copy the code

ChannelEventRunnable

ChannelEventRunnable implements the Runnable interface. Let’s look at its run method.

com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run

@Override
public void run(a) {
    if (state == ChannelState.RECEIVED) {
        try {
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is "+ message, e); }}else {
        switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                }
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is "+ message); }}}Copy the code

In combination with the above code, the server receives the request and parses it into a message. It then retrits the thread pool and encapsulates the message as channelstate.received. When an idle thread processes the request, the ChannelEventRunnable#received method is executed and HeaderExchangeHandler#handleRequest is finally called

HeaderExchangeHandler

com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    // Encapsulate a response object, holding the request ID
    Response res = new Response(req.getId(), req.getVersion());
   	
    / /... Omit some code
    
    // find handler by message class.
    // Get the request message, such as method name, parameter type, parameter value. That is, the RpcInvocation that encapsulates the target method information
    Object msg = req.getData();
    try {
        // handle data.
        // Finally call DubboProtocol#reply
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}
Copy the code

DubboProtocol#reply

This is a key method, source code below

com.alibaba.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply

@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        Invocation inv = (Invocation) message;
        / / find the InvokerInvoker<? > invoker = getInvoker(channel, inv);/ /... Omit some code
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
        // Finally find the real implementation class and call the target method
        returninvoker.invoke(inv); } } Invoker<? > getInvoker(Channel channel, Invocation inv)throws RemotingException {
    / /... Omit some code
    
    / / build key, generate rules: group/serviceName: version: the port
    String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), 
                                   											inv.getAttachments().get(Constants.GROUP_KEY));
    // According to key, find the corresponding Exporter from the Map of the previously exposed serviceDubboExporter<? > exporter = (DubboExporter<? >) exporterMap.get(serviceKey);// Return to the Invoker that my Exporter encapsulated
    return exporter.getInvoker();
}

Copy the code

The key is serviceKey. When a service is exposed, the generated exporter is saved to exporterMap, and its key is also serviceKey.

summary

The main process: After receiving the request, The NettyServer parses it into information and sends it all to the thread pool, which is sealed into a ChannelEventRunnable object and waits for the thread to execute it. The worker thread calls the DubboProtocol#reply method, builds serviceKey, finds the corresponding Exporter from the cache, executes the Invoke method, and finally finds the true implementation class, executes the target method, and returns the result.