SOFARPC source code analysis series:

1. Source code analysis -SOFARPC extensible mechanism SPI

2. Source code analysis –SOFARPC client service references

3. Source code analysis –SOFARPC client service call

4. Source code analysis -SOFARPC server exposed

5. Source code analysis –SOFARPC calls service

6. Source code analysis – how does SOFARPC achieve load balancing compared to Dubbo?

7. Source code analysis -SOFARPC is how to achieve connection management and heartbeat?

8. Source code analysis — Looking at EventBus in SOFARPC from design mode?


Let’s first look at the diagram of BoltClientProxyInvoker

So when we invoke BoltClientProxyInvoker#invoke we actually invoke the parent invoke method ClientProxyInvoker#invoke

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        Throwable throwable = null;
        try {
            RpcInternalContext.pushContext();
            RpcInternalContext context = RpcInternalContext.getContext();
            context.setProviderSide(false);
            // Wrap the request
            decorateRequest(request);
            try {
                // Generate the start call event
                if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
                    EventBus.post(new ClientStartInvokeEvent(request));
                }
                // Get the result
                response = cluster.invoke(request);
            } catch (SofaRpcException e) {
                throwable = e;
                throw e;
            } finally {
                // Generates an end-of-call event
                if(! request.isAsync()) {if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
                        EventBus.post(newClientEndInvokeEvent(request, response, throwable)); }}}// Wrap the response
            decorateResponse(response);
            return response;
        } finally{ RpcInternalContext.removeContext(); RpcInternalContext.popContext(); }}Copy the code

This approach does a few things:

  1. Wrap the request request and set the necessary parameters
  2. Invoke the FailOverCluster invoke method to send reques requests and get a response
  3. Package response response

When we call FailOverCluster, we actually call the invoker method of its parent class AbstractCluster. The FailOverCluster diagram is as follows:

So let’s go to AbstractCluster’s invoker method:

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        try {
            // Do some initialization checks, such as unconnected can connect
            checkClusterState();
            // start the call
            countOfInvoke.incrementAndGet(); / / count + 1
            response = doInvoke(request);
            return response;
        } catch (SofaRpcException e) {
            // The client receives an exception (its own exception)
            throw e;
        } finally {
            countOfInvoke.decrementAndGet(); / / count - 1}}Copy the code

The checkClusterState method is used to check whether it has been destroyed or initialized by calling init. It will then remember the number before calling it. Then we go to the doInvoke method:

    public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
        String methodName = request.getMethodName();
        int retries = consumerConfig.getMethodRetries(methodName);
        int time = 0;
        SofaRpcException throwable = null;// Exception log
        List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1);
        do {
            // Load balancing
            ProviderInfo providerInfo = select(request, invokedProviderInfos);
            try {
                // Call the filter chain
                SofaResponse response = filterChain(providerInfo, request);
                if(response ! =null) {
                    if(throwable ! =null) {
                        if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
                            LOGGER.warnWithApp(consumerConfig.getAppName(),
                                LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY,
                                    throwable.getClass() + ":"+ throwable.getMessage(), invokedProviderInfos)); }}return response;
                } else {
                    throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
                        "Failed to call " + request.getInterfaceName() + "." + methodName
                            + " on remote server " + providerInfo + ", return null"); time++; }}catch (SofaRpcException e) { // RPC exception retry is initiated only when server exception + timeout exception occurs
                if (e.getErrorType() == RpcErrorType.SERVER_BUSY
                    || e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) {
                    throwable = e;
                    time++;
                } else {
                    throwe; }}catch (Exception e) { // Do not retry other exceptions
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
                    "Failed to call " + request.getInterfaceName() + "." + request.getMethodName()
                        + " on remote server: " + providerInfo + ", cause by unknown exception: "
                        + e.getClass().getName() + ", message is: " + e.getMessage(), e);
            } finally {
                if (RpcInternalContext.isAttachmentEnable()) {
                    RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES,
                        time + 1); // Number of retries
                }
            }
            invokedProviderInfos.add(providerInfo);
        } while (time <= retries);

        throw throwable;
    }
Copy the code

This method mainly does this:

  1. Loop if it fails
  2. Select Provider for load balancing
  3. The server is called through the filter chain and the result is returned
  4. Exception handling

Next we go to the filterChain method, and then jump to the invoke method in ConsumerInvoker based on the filterChain

    @Override
    public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
        // Set the server application
        ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo();
        String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME);
        if (StringUtils.isNotEmpty(appName)) {
            sofaRequest.setTargetAppName(appName);
        }

        // Currently it is only sent to the server through the client
        return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);
    }
Copy the code

ConsumerBootstrap. GetCluster () will return FailOverCluster instance, and then call the superclass AbstractCluster sendMsg method

    public SofaResponse sendMsg(ProviderInfo providerInfo, SofaRequest request) throws SofaRpcException {
        ClientTransport clientTransport = connectionHolder.getAvailableClientTransport(providerInfo);
        if(clientTransport ! =null && clientTransport.isAvailable()) {
            return doSendMsg(providerInfo, clientTransport, request);
        } else {
            throwunavailableProviderException(request.getTargetServiceUniqueName(), providerInfo.getOriginUrl()); }}protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport, SofaRequest request) throws SofaRpcException {
        RpcInternalContext context = RpcInternalContext.getContext();
        // Add the remote address of the calling server
        RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(), providerInfo.getPort());
        try {
            checkProviderVersion(providerInfo, request); // Special processing according to the server version
            String invokeType = request.getInvokeType();
            int timeout = resolveTimeout(request, consumerConfig, providerInfo);

            SofaResponse response = null;
            // Synchronous call
            if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
                long start = RpcRuntimeContext.now();
                try {
                    response = transport.syncSend(request, timeout);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        longelapsed = RpcRuntimeContext.now() - start; context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed); }}}// one-way call
            else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
                long start = RpcRuntimeContext.now();
                try {
                    transport.oneWaySend(request, timeout);
                    response = buildEmptyResponse(request);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        longelapsed = RpcRuntimeContext.now() - start; context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed); }}}/ / Callback invocation
            else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
                // Call level callback listener
                SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
                if (sofaResponseCallback == null) {
                    SofaResponseCallback methodResponseCallback = consumerConfig
                        .getMethodOnreturn(request.getMethodName());
                    if(methodResponseCallback ! =null) { // The Callback of the methodrequest.setSofaResponseCallback(methodResponseCallback); }}// Record the start time of sending
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
                // start the call
                transport.asyncSend(request, timeout);
                response = buildEmptyResponse(request);
            }
            / / the Future calls
            else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
                // Record the start time of sending
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
                // start the call
                ResponseFuture future = transport.asyncSend(request, timeout);
                // Place the thread context
                RpcInternalContext.getContext().setFuture(future);
                response = buildEmptyResponse(request);
            } else {
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
            }
            return response;
        } catch (SofaRpcException e) {
            throw e;
        } catch (Throwable e) { // Other exceptions occur on the client
            throw newSofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e); }}Copy the code

The sendMsg method ends up calling doSendMsg. SoSendMsg does the following things:

  1. If the call is synchronous, the wrapped arguments are returned directly
  2. If the call is one-way, the buildEmptyResponse method is called and returns an empty response
  3. If callback calls asyncSend, RPC automatically executes the callback implementation when it gets the result from the server.
  4. The response result returned by the server is cached by RPC. When the client needs the response result, it needs to actively obtain the result, and the process of obtaining the result blocks the thread.