preface

Dubbo is an RPC framework, the core of which is definitely the sending and processing of the network. In this section we will talk about how the service is sent to the server layer by layer through Netty4.

Analysis of the

As we examined in the previous section, the bean is invoked by the business code through a call to a Proxy$object generated by JavAssist, culminating in InvokerInvocationHandler# Invoke ().

@Override
public Object invoke(Object proxy, Method method, Object[] args) { String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    //....
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
Copy the code

And the invoker object, which we analyzed before, the final result is MockClusterInvoker(FailoverClusterInvoker(directory(invokers(RegistryDirectory$InvokerDelegate(ListenerInvokerWrapper(Ca LlbackRegistrationInvoker (Filters (AsyncToSyncInvoker (DubboInvoker (clients [])))))))))) we began to analyze further.

Mock

MockClusterInvoker is used as a way to customize the logic if the call results in an error. There are three uses of MockClusterInvoker that were introduced earlier. Let’s look at the implementation

//org..... support.wrapper.MockClusterInvoker#invoke
public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {/ / 1
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {/ / 2
            result = doMockInvoke(invocation, null);
        } else {
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {  throw e; }
                result = doMockInvoke(invocation, e);/ / 3}}return result;
    }
Copy the code

Note 1 indicates that the mock is not configured, and note 2 indicates that the mock is forced to be configured (if there is a problem upstream that cannot be solved, it can be configured downstream), and note 3 indicates that the mock logic is executed after the invocation fails and returns the result of the mock execution. Generally, the mock can return static data. I have used it before to extract some pre-stored data from OSS. The advantages are similar to downgrade.

Failover

FailoverClusterInvoker is a cluster policy. If a third-party fails to be invoked, you can try again for n times.

//org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;/ / 1
        RpcException le = null;
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); 
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            try {
                Result result = invoker.invoke(invocation);/ / 2
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { throw e; }
                le = e;
            } catch (Throwable e) {
                le = newRpcException(e.getMessage(), e); }}throw new RpcException(le.getCode(), "Failed to invoke the method); / / 3}Copy the code

The code is simple: Comment 1 obtains the configured number of retries from the configuration retries=x. The default retries are two. Comment 2 initiates a remote call. I put the invoker selection screen here, dubbo will execute the invoker out, to avoid the probability of failure. Dubbo also provides FailfastClusterInvoker,BroadcastClusterInvoker, FailsafeClusterInvoker, and so on.

Listener (CallbackRegistrationInvoker)

CallbackRegistrationInvoker main function is to the callback after the execution of the invoker, and callback is through inheritance in dubbo ListenableFilter filter implementation.

//org.xx.protocol.ProtocolFilterWrapper.CallbackRegistrationInvoker#invoke
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = filterInvoker.invoke(invocation);
    asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
        for (int i = filters.size() - 1; i >= 0; i--) {/ / 1
            Filter filter = filters.get(i);
            if (filter instanceof ListenableFilter) {
                Filter.Listener listener = filter.listener();
                if(listener ! =null) {
                    if (t == null) {
                        listener.onResponse(r, filterInvoker, invocation);
                    } else{ listener.onError(t, filterInvoker, invocation); }}}else{ filter.onResponse(r, filterInvoker, invocation); }}});return asyncResult;
}
Copy the code

At comment 1, the result returned is executed and processed by all listeners.

AsyncToSync

AsyncToSyncInvoker AsyncToSyncInvoker AsyncToSyncInvoker AsyncToSyncInvoker AsyncToSyncInvoker AsyncToSyncInvoker AsyncToSyncInvoker

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);/ / 1
    try {
        if (InvokeMode.SYNC == invocation.getInvokeMode()) {
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);/ / 2
        }} catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiti", e);
    } 
    returnasyncResult; }Copy the code

Invoke () returns an asyncResult. Invoke () returns an asyncResult. Invoke () returns an asyncResult, which is an asynchronous result and may not be returned by the server. Timeout implementation will be analyzed later in this article.

Making a remote call (DubboInvoker)

DubboInvoker Invoker is used to obtain communication implementation and initiate TCP requests.

//org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
		/ /...
        ExchangeClient currentClient;
        try {
    int timeout = getUrl().getParameter(methodName, "timeout".1000);
          if (isOneway) {// Return is not required
      boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
              currentClient.send(inv, isSent);
              return AsyncRpcResult.newDefaultAsyncResult(invocation);
          } else {
              AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
              CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);//2 Initiate a request
              asyncRpcResult.subscribeTo(responseFuture); FutureContext.getContext().setCompatibleFuture(responseFuture);
              returnasyncRpcResult; }}}Copy the code

Note 1 sends the request directly if the setting does not require a return, and Note 2 initiates the request and returns the result of a Future remote call. Currentclient. request(inv, timeout) let’s click on it

org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);// Set the request data
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        channel.send(req);/ / 1
    } catch (RemotingException e) { future.cancel(); throw e; }
    return future;
}
Copy the code

Note 1 is the netty channel, and this is the write data stream

public void send(Object message, boolean sent) {
    boolean success = true;
    int timeout = 0;
    try {
        ChannelFuture future = channel.writeAndFlush(message);
        if (sent) {
        timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, 1000);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if(cause ! =null) {
            throwcause; }}}Copy the code

At this point the data is sent, and it will not be transcoded until the server returns the data and netty receives the IO practice channelRead. Unpark is the thread that we initially suspended in AsyncToSyncInvoker.