The 9:30 horse race is a long way, but the future will be better

Dubbo supports synchronous and asynchronous invocation of services.

Dubo2.6 and previous versions have some shortcomings in implementing asynchronous calls, which are in fact pseudo-asynchronous.

Here is an example of asynchrony.

// This method should return Foo, but will return NULL immediately after asynchrony

fooService.findFoo(fooId);

// Immediately get the Future instance of the current call, which will be overwritten when a new call occurs

Future<Foo> fooFuture = RpcContext.getContext().getFuture();

 

// Call another service's method

barService.findBar(barId);

// Get the Future of the current call immediately

Future<Bar> barFuture = RpcContext.getContext().getFuture();

 

// At this point, the methods of both services are executing concurrently

// Wait for the first call to complete, the thread will enter the Sleep state, and wake up when the call is complete.

Foo foo = fooFuture.get();

/ / same as above

Bar bar = barFuture.get();

// If the first call waits 5 seconds and the second waits 6 seconds, the entire call completes in 6 seconds.

Copy the code

When the service method is called, Dubbo creates a DefaultFuture and stores it in the RpcContext. In the user thread, if the user wants to get the result of the call, the Future is retrieved from the RpcContext and the get method is called. But if the service is still not finished at this point, it blocks until the result is returned or the call times out. When a block occurs, subsequent steps of the method are not executed. For asynchrony, this is obviously unreasonable. Ideally asynchrony is if the service fails, subsequent methods of the user thread continue to execute without blocking the wait.

Starting with Dubbo2.7, Dubbo’s asynchronous calls started to be implemented based on CompletableFuture.

DubboInvoker is an executor through which remote calls can be made. In the remote call to Dubbo2.6, part of the code looks like this (only part of the code remains) :

DubboInvoker class

protected Result doInvoke(final Invocation invocation) throws Throwable {

        RpcInvocation inv = (RpcInvocation) invocation;

        // Ignore some code

        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);

        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

        // Ignore some code

        // One-way call with no return value

        if (isOneway) {

           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);

           currentClient.send(inv, isSent);

           RpcContext.getContext().setFuture(null);

           return new RpcResult();

        // Asynchronous invocation

        } else if (isAsync) {

           ResponseFuture future = currentClient.request(inv, timeout);

           RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));

           return new RpcResult();

        // Synchronous call

        } else {

           RpcContext.getContext().setFuture(null);

           return (Result) currentClient.request(inv, timeout).get();

        }     

}

Copy the code

In Dubbo2.6 version and the previous versions, asynchronous or synchronous invocation calls, will be called HeaderExchangeClient. Request method, return a DefaultFuture objects, different points are: The asynchronous call stores the Future into the RpcContext and returns an empty RpcResult result first. Instead of putting the future into the RpcContext, a synchronous call directly calls the Future’s GET method, blocking and waiting for the result of the call.

HeaderExchangeChannel class

public ResponseFuture request(Object request, int timeout) throws RemotingException {

        Request req = new Request();

        req.setVersion(Version.getProtocolVersion());

        req.setTwoWay(true);

        req.setData(request);

        DefaultFuture future = new DefaultFuture(channel, req, timeout); 

        channel.send(req);

        // Some code is ignored

        return future;

}

Copy the code
DefaultFuture class (some code ignored)

public Object get(int timeout) throws RemotingException {

        if(! isDone()) {

            long start = System.currentTimeMillis();

            lock.lock();

            try {

                while(! isDone()) {

                    done.await(timeout, TimeUnit.MILLISECONDS);

                    if (isDone() || System.currentTimeMillis() - start > timeout) {

                        break;

                    }

                }

            } catch (InterruptedException e) {

                throw new RuntimeException(e);

            } finally {

                lock.unlock();

            }

        }

        return returnFromResponse();

}



Copy the code

When the server finishes processing the information, the HeaderExchangeHandler processes the sent Response, retrieves the corresponding DefaultFuture object according to requestId, and finally assigns the result by calling the doReceived method. AQS conditional locking mechanism is used to wake up the blocking thread.

DefaultFuture class

private void doReceived(Response res) {

        lock.lock();

        try {

            response = res;

            if(done ! =null) {

                done.signal();

            }

        } finally {

            lock.unlock();

        }

        if(callback ! =null) {

            invokeCallback(callback);

        }

}

Copy the code

In version 2.7 of Dubbo, asynchronous calls have been improved to use CompletableFuture.

An example of an asynchronous call to Dubbo2.7:

// This call returns null immediately

asyncService.sayHello("world");

// Get the Future reference of the call. When the result is returned, it will be notified and set to the Future

CompletableFuture<String> helloFuture = RpcContext.getContext().getCompletableFuture();

// Add a callback for the Future

helloFuture.whenComplete((retValue, exception) -> {

    if (exception == null) {

        System.out.println(retValue);

    } else {

        exception.printStackTrace();

    }

});

Copy the code

The same DubboInvoker initiates the remote call, improved in the doInvoke method:

DubboInvoker27.9.version

protected Result doInvoke(final Invocation invocation) throws Throwable {

     RpcInvocation inv = (RpcInvocation) invocation;

     final String methodName = RpcUtils.getMethodName(invocation);

     boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

     // one-way call

     if (isOneway) {

         boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);

         currentClient.send(inv, isSent);

         return AsyncRpcResult.newDefaultAsyncResult(invocation);

      // Synchronous and asynchronous calls

      } else {

         ExecutorService executor = getCallbackExecutor(getUrl(), inv);

                CompletableFuture<AppResponse> appResponseFuture =

                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);           FutureContext.getContext().setCompatibleFuture(appResponseFuture);

        AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);

        result.setExecutor(executor);

        return result;

    } 

}

Copy the code

In Dubbo2.7, DubboInvolnvoker uniformly handles synchronous and asynchronous calls, encapsulates them as CompletableFuture, and returns them as AsyncRpcResult.

Dubbo2.7 version under HeaderExchangeChannel. Request methods differ with version 2.6 is not big, just DeafultFuture object is a bit different, namely later inherited CompletableFuture class.

The handling of synchronous and asynchronous calls is left to the AsyncToSyncInvoker class.

public Result invoke(Invocation invocation) throws RpcException {

        // Call DubboInvoker and other Invoker returns the call result

        Result asyncResult = invoker.invoke(invocation);

        try {

            // If the call is synchronous

            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {

                // You can't use the CompletableFuture#get() method, otherwise there will be a serious performance degradation.

                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

            }

        }

        // Some code is ignored

        return asyncResult;

    }

Copy the code

Unlike Dubbo2.6, Dubbo2.7 abandons the AQS conditional locking mechanism and uses the CompletableFuture class’s complete method instead.

【 code1 】

DefaultFuture class

private void doReceived(Response res) {

        // Ignore some code

        if (res.getStatus() == Response.OK) {

            // Assign the result to CompletableFuture

            this.complete(res.getResult());

        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {

            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));

        } else {

            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));

        }

      // Ignore some code

}

Copy the code

For the Result interface above, there are two implementation objects, let’s do a simple comparison analysis here.

AsyncRpcResult

This class represents an outstanding RPC call and will retain some context information for the call, such as RpcContext and Invocation, so when the call is complete and the result is returned, it ensures that all context is restored as it was when the call was made, before any callback was called.

While Result implements the CompletionStage, AsyncRpcResult allows you to easily build chains of asynchronous filters whose state will be driven entirely by the state of the underlying RPC call.

AsyncRpcResult does not contain any concrete values (except the base value brought by the CompletableFuture), which should be treated as a state transfer node. #getValue() and #getException() are both inherited from the Result interface and implemented primarily for compatibility reasons. Many older Filter implementations would probably call getValue directly.

AppResponse

AsyncRpcResult was introduced in Duboo3.0.0 to replace RpcResult, and RpcResult was replaced with AppResponse: AsyncRpcResult is the object actually passed in the invocation chain, and AppResponse represents only the business result.

AsyncRpcResult represents the future of an unfinished RPC call, and AppResponse is the actual return type of this call. In theory, AppResponse does not have to implement the Result interface, mainly for compatibility purposes.

In Dubbo service exposure, the ProtocolFilterWrapper builds the interceptor chain Filter, which calls some constructed Filter, such as ExecuteLimitFilter, before calling the actual DubboInvoker, Limit the maximum number of concurrent requests per method per service. Here is the logic for Dubbo2.6 to build the chain of interceptors:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {

    Invoker<T> last = invoker;

    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key.group);

    if(! filters.isEmpty()) {

       for (int i = filters.size() - 1; i >= 0; i--) {

           final Filter filter = filters.get(i);

           final Invoker<T> next = last;

           ast = new Invoker<T>() {

                 // Ignore some code

                 @Override

                 public Result invoke(Invocation invocation) {

                        return filter.invoke(next, invocation);

                 }

                };

            }

        }

        return last;

}

Copy the code

However, there are some problems with asynchronous calls in Dubbo2.6, because Dubbo2.6 returns an empty RpcResult object when making an asynchronous call. When some Filter needs to process the returned result, it is obvious that the result cannot be processed in this situation. Dubbo2.7 improves on this situation.

Dubbo2.7 builds the chain of interceptors as follows:

ProtocolFilterWrapper class

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {

        Invoker<T> last = invoker;

        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key.group);

        if(! filters.isEmpty()) {

            for (Filter filter : filters) {

                last = new FilterNode<T>(invoker, last, filter);

            }

        }

        return last;

}

Copy the code

Then explain the invoke method in FilterNode:

@Override

public Result invoke(Invocation invocation) throws RpcException {

      Result asyncResult;

      asyncResult = filter.invoke(next, invocation);

      // Ignore some code

      return asyncResult.whenCompleteWithContext((r, t) -> {

            // Ignore some code

            } else if (filter instanceof Filter.Listener) {

                Filter.Listener listener = (Filter.Listener) filter;

                if (t == null) {

                    listener.onResponse(r, invoker, invocation);

                } else {

                    listener.onError(t, invoker, invocation);

                }

            }

        });

}

Copy the code

When called asynchronously, it is passed as an AsyncRpcResult object, with CompletableFuture#whenComplete for asynchronous logical processing.

public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn){

     / / is CompletableFuture class

     this.responseFuture = this.responseFuture.whenComplete((v, t) -> {

            beforeContext.accept(v, t);

            fn.accept(v, t);

            afterContext.accept(v, t);

      });

      return this;

}

Copy the code

This concludes the Dubbo asynchronous analysis. Thank you for reading.