Analysis of Nacos communication module

Server

Overall overview

In version 2.0, Nacos uses Grpc as the communication module, BaseRpcServer defines the basic interface for starting and closing services, and BaseGrpcServer realizes the basic Server module function. GrpcClusterServer is used for the interaction between cluster nodes. GrpcSdkServer is used for client and server interaction and defines their respective execution thread pools.

BaseGrpcServer

BaseGrpcServer uses Grpc to realize the function of the remote Server endpoint. First, it initializes a service call interceptor, in which the attributes of the remote call are obtained into the thread context. Note that getInternalChannel is an encapsulation of Netty Channel in GRPC via reflection.

ServerInterceptor serverInterceptor = new ServerInterceptor() {
    @Override
    public <T, S> ServerCall.Listener<T> interceptCall(ServerCall
       
         call, Metadata headers, ServerCallHandler
        
          next)
        ,>
       ,> {
        Context ctx = Context.current()
                .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
        if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
            Channel internalChannel = getInternalChannel(call);
            ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
        }
        returnContexts.interceptCall(ctx, call, headers, next); }};Copy the code

Then there is the process of initializing methods, not in the traditional form of defining proTo, but in a manual register that supports unary calls (a round-trip request) and bi-directional streaming calls. The transmitted request body and response body are Payload types, which are generated based on the definition of protobuf and use the any type to support multiple data formats.

private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
    
    // unary common call register.
    / / definition Method
    final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
            .setType(MethodDescriptor.MethodType.UNARY)
            .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
            .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
            .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
    
    // Define the service handling method callback
    final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
            .asyncUnaryCall((request, responseObserver) -> {
                grpcCommonRequestAcceptor.request(request, responseObserver);
            });
    
    / / define Servie
    final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
            .addMethod(unaryPayloadMethod, payloadHandler).build();
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
    
    // bi stream register.
    final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
            (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
    
    final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
            .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
                    .generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
            .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
            .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
    
    final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
            .builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
    
}
Copy the code

The final step is to set up Server Attributes and manage connections. TransportReady constructs a new Attributes based on the Attributes of the current connection when a connection is established. This basically suggests a connectionId for each connection, echoing the interceptor mentioned earlier. And removes the connection from connectionManager when the connection is broken.

server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
        .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
        .compressorRegistry(CompressorRegistry.getDefaultInstance())
        .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
        .addTransportFilter(new ServerTransportFilter() {
            @Override
            public Attributes transportReady(Attributes transportAttrs) {
                InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                        .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                        .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                int remotePort = remoteAddress.getPort();
                int localPort = localAddress.getPort();
                String remoteIp = remoteAddress.getAddress().getHostAddress();
                Attributes attrWrapper = transportAttrs.toBuilder()
                        .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                        .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                        .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
                Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
                return attrWrapper;
                
            }
            
            @Override
            public void transportTerminated(Attributes transportAttrs) {
                String connectionId = null;
                try {
                    connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                } catch (Exception e) {
                    // Ignore
                }
                if (StringUtils.isNotBlank(connectionId)) {
                    Loggers.REMOTE_DIGEST
                            .info("Connection transportTerminated,connectionId = {} ", connectionId);
                    connectionManager.unregister(connectionId);
                }
            }
        }).build();
Copy the code

ConnectionManager

ConnectionManager is used to manage long connections to the Server. In Nacos, only connections with bidirectional streams are included in the management. Nacos also uses bidirectional streams to complete active push on the Server side. Similar to Sentinel/Seata, ConcurrentHashMap is used to store the corresponding Connection object. When registering a Connection, ConcurrentHashMap also checks the current Connection limitation, performs a statistical count on the client, and calls an event callback to notify the listener that a Connection has been established. Note the use of the synchronized keyword, which indicates that the callbacks are actually executed serially (for example, RpcAckCallbackInitorOrCleaner as the connection/disconnection initialization/cleaning RpcAckCallbackSynchronizer connect the corresponding Map Entry)

public synchronized boolean register(String connectionId, Connection connection) {
    
    if (connection.isConnected()) {
        if (connections.containsKey(connectionId)) {
            return true;
        }
        if(! checkLimit(connection)) {return false;
        }
        if (traced(connection.getMetaInfo().clientIp)) {
            connection.setTraced(true);
        }
        connections.put(connectionId, connection);
        connectionForClientIp.get(connection.getMetaInfo().clientIp).getAndIncrement();
        
        clientConnectionEventListenerRegistry.notifyClientConnected(connection);
        Loggers.REMOTE_DIGEST
                .info("new connection registered successfully, connectionId = {},connection={} ", connectionId,
                        connection);
        return true;
        
    }
    return false;
    
}
Copy the code

Similarly, a scheduled task is initiated in ConnectionManager. On the one hand, it checks for connection limits, notifes the Client with a ConnectionReset request to close exceeded connections, and on the other hand, it periodically scans and activates idle connections. Focus on the idle connection probe process, where CountDownLatch is used to asynchronously send the client to detect the request, call countDown when it responds, detect that the thread blocks for a certain amount of time waiting for the asynchronous task to complete, and then close the connection when it is not responding.

RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run(a) {
        try {
            / /...
          
            //4.client active detection.
            Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
            if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                Set<String> successConnections = new HashSet<>();
                final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                for (String outDateConnectionId : outDatedConnections) {
                    try {
                        Connection connection = getConnection(outDateConnectionId);
                        if(connection ! =null) {
                            ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                            connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                @Override
                                public Executor getExecutor(a) {
                                    return null;
                                }
                                
                                @Override
                                public long getTimeout(a) {
                                    return 1000L;
                                }
                                
                                @Override
                                public void onResponse(Response response) {
                                    latch.countDown();
                                    if(response ! =null&& response.isSuccess()) { connection.freshActiveTime(); successConnections.add(outDateConnectionId); }}@Override
                                public void onException(Throwable e) { latch.countDown(); }}); Loggers.REMOTE_DIGEST .info("[{}]send connection active request ", outDateConnectionId);
                        } else{ latch.countDown(); }}catch (ConnectionAlreadyClosedException e) {
                        latch.countDown();
                    } catch (Exception e) {
                        Loggers.REMOTE_DIGEST
                                .error("[{}]Error occurs when check client active detection ,error={}",
                                        outDateConnectionId, e);
                        latch.countDown();
                    }
                }
                
                latch.await(3000L, TimeUnit.MILLISECONDS);
                Loggers.REMOTE_DIGEST
                        .info("Out dated connection check successCount={}", successConnections.size());
                
                for (String outDateConnectionId : outDatedConnections) {
                    if(! successConnections.contains(outDateConnectionId)) { Loggers.REMOTE_DIGEST .info("[{}]Unregister Out dated connection....", outDateConnectionId); unregister(outDateConnectionId); }}}//reset loader client
            
            if (isLoaderClient) {
                loadClient = -1;
                redirectAddress = null;
            }
            
            Loggers.REMOTE_DIGEST.info("Connection check task end");
            
        } catch (Throwable e) {
            Loggers.REMOTE.error("Error occurs during connection check... ", e); }}},1000L.3000L, TimeUnit.MILLISECONDS);
Copy the code

GrpcBiStreamRequestAcceptor

GrpcBiStreamRequestAcceptor is mentioned the two-way flow of message handler. The connection between the Client and Server is established through bidirectional services. Only the Client that has established the connection can initiate unary unidirectional requests. From the message handler, we can see that the client actually only sends the connection requests through this two-way flow, which is then used by the server to push the messages. The rest of the requests are done through another unary one-way request. When a connection is registered, the responseObserver for the current connection is injected into the GrpcConnection so that the server can obtain the connection through the ConnectionManager and actively push the data.

@Override
public void onNext(Payload payload) {
    
    clientIp = payload.getMetadata().getClientIp();
    traceDetailIfNecessary(payload);
    
    Object parseObj;
    try {
        parseObj = GrpcUtils.parse(payload);
    } catch (Throwable throwable) {
        Loggers.REMOTE_DIGEST
                .warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
        return;
    }
    
    if (parseObj == null) {
        Loggers.REMOTE_DIGEST
                .warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
                        payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
        return;
    }
    if (parseObj instanceof ConnectionSetupRequest) {
        ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
        Map<String, String> labels = setUpRequest.getLabels();
        String appName = "-";
        if(labels ! =null && labels.containsKey(Constants.APPNAME)) {
            appName = labels.get(Constants.APPNAME);
        }
        
        ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
                remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
                setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
        metaInfo.setTenant(setUpRequest.getTenant());
        Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
        connection.setAbilities(setUpRequest.getAbilities());
        booleanrejectSdkOnStarting = metaInfo.isSdkSource() && ! ApplicationUtils.isStarted();if(rejectSdkOnStarting || ! connectionManager.register(connectionId, connection)) {//Not register to the connection manager if current server is over limit or server is starting.
            try {
                Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
                        rejectSdkOnStarting ? " server is not started" : " server is over limited.");
                connection.request(new ConnectResetRequest(), 3000L);
                connection.close();
            } catch (Exception e) {
                //Do nothing.
                if (connectionManager.traced(clientIp)) {
                    Loggers.REMOTE_DIGEST
                            .warn("[{}]Send connect reset request error,error={}", connectionId, e); }}}}else if (parseObj instanceof Response) {
        Response response = (Response) parseObj;
        if (connectionManager.traced(clientIp)) {
            Loggers.REMOTE_DIGEST
                    .warn("[{}]Receive response of server request ,response={}", connectionId, response);
        }
        RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
        connectionManager.refreshActiveTime(connectionId);
    } else {
        Loggers.REMOTE_DIGEST
                .warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId, parseObj); }}Copy the code

If the data received is a customer response, the activation time of the corresponding connection is refreshed. Notice the side use RpcAckCallbackSynchronizer to handle the response of the client side, the principle and the Sentinel, Seata actually which used in the synchronous mechanism is very similar, is to use a map to store needs to receive the result sends a request, When the response is received, find the corresponding sending task Future and set the task execution result. The difference is that in Nacos this map uses the EvictionListener provided by ConcurrentLinkedHashMap to process expired tasks.

public static void ackNotify(String connectionId, Response response) {
    
    Map<String, DefaultRequestFuture> stringDefaultPushFutureMap = CALLBACK_CONTEXT.get(connectionId);
    if (stringDefaultPushFutureMap == null) {
        
        Loggers.REMOTE_DIGEST
                .warn("Ack receive on a outdated connection ,connection id={},requestId={} ", connectionId,
                        response.getRequestId());
        return;
    }
    
    DefaultRequestFuture currentCallback = stringDefaultPushFutureMap.remove(response.getRequestId());
    if (currentCallback == null) {
        
        Loggers.REMOTE_DIGEST
                .warn("Ack receive on a outdated request ,connection id={},requestId={} ", connectionId,
                        response.getRequestId());
        return;
    }
    
    if (response.isSuccess()) {
        currentCallback.setResponse(response);
    } else {
        currentCallback.setFailResult(newNacosException(response.getErrorCode(), response.getMessage())); }}Copy the code

GrpcRequestAcceptor

As mentioned above, all requests initiated by the client are processed by unary methods. The core code obtains the RequestHandler according to the type field of the Metadata in the Request. RequestHandler processing after the note will call responseObserver. OnCompleted () method, because this method one yuan is the corresponding request.

@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
    
    traceIfNecessary(grpcRequest, true);
    String type = grpcRequest.getMetadata().getType();
    / /...
    
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    / /...
    
    //check connection status.
    String connectionId = CONTEXT_KEY_CONN_ID.get();
    boolean requestValid = connectionManager.checkValid(connectionId);
    if(! requestValid) {/ /...
        return;
    }
    
    / /...
    
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        Response response = requestHandler.handleRequest(request, requestMeta);
        Payload payloadResponse = GrpcUtils.convert(response);
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
        Loggers.REMOTE_DIGEST
                .error("[{}] Fail to handle request from connection [{}] ,error message :{}"."grpc", connectionId,
                        e);
        Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
                (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
                e.getMessage()));
        traceIfNecessary(payloadResponse, false); responseObserver.onNext(payloadResponse); responseObserver.onCompleted(); }}Copy the code

RequestHandler is actually an abstract class that iterates through all request filters when calling an actual method. For example, NacOS has added a TpsControlRequestFilter to limit the flow of method requests.

public Response handleRequest(T request, RequestMeta meta) throws NacosException {
    for (AbstractRequestFilter filter : requestFilters.filters) {
        try {
            Response filterResult = filter.filter(request, meta, this.getClass());
            if(filterResult ! =null && !filterResult.isSuccess()) {
                returnfilterResult; }}catch (Throwable throwable) {
            Loggers.REMOTE.error("filter error", throwable); }}return handle(request, meta);
}
Copy the code

RpcPushService

RpcPushService for the server to the client push messages, combining with the previous ConnectionManager and GrpcBiStreamRequestAcceptor, The Server can find the corresponding GrpcConnection through the connectionId, and then call the sending method of GrpcConnection. After receiving the response, RpcAckCallbackSynchronizer will set the results and set by DefaultRequestFuture callback callback methods.

public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack, Executor executor) {
    Connection connection = connectionManager.getConnection(connectionId);
    if(connection ! =null) {
        try {
            connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
                
                @Override
                public Executor getExecutor(a) {
                    return executor;
                }
                
                @Override
                public void onResponse(Response response) {
                    if (response.isSuccess()) {
                        requestCallBack.onSuccess();
                    } else {
                        requestCallBack.onFail(newNacosException(response.getErrorCode(), response.getMessage())); }}@Override
                public void onException(Throwable e) { requestCallBack.onFail(e); }}); }catch (ConnectionAlreadyClosedException e) {
            connectionManager.unregister(connectionId);
            requestCallBack.onSuccess();
        } catch (Exception e) {
            Loggers.REMOTE_DIGEST
                    .error("error to send push response to connectionId ={},push response={}", connectionId, request, e); requestCallBack.onFail(e); }}else{ requestCallBack.onSuccess(); }}Copy the code

Synchronous sending asynchronous sending eventually calls sendRequestInner to send messages. In this function, a DefaultRequestFuture is constructed. DefaultRequestFuture contains two callback functions, one for the business, One is the timeout callback. The default timeout clears up the DefaultRequestFuture. SendRequestNoAck is then called to push the request through streamObserver using a two-way flow.

private DefaultRequestFuture sendRequestInner(Request request, RequestCallBack callBack) throws NacosException {
    final String requestId = String.valueOf(PushAckIdGenerator.getNextId());
    request.setRequestId(requestId);
    
    DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getMetaInfo().getConnectionId(), requestId,
            callBack, () -> RpcAckCallbackSynchronizer.clearFuture(getMetaInfo().getConnectionId(), requestId));
    
    RpcAckCallbackSynchronizer.syncCallback(getMetaInfo().getConnectionId(), requestId, defaultPushFuture);
    sendRequestNoAck(request);
    return defaultPushFuture;
}
Copy the code

Client

Overall overview

On the client side, the overall level is similar to that on the server side. The difference is that RpcClient not only defines interfaces, but also implements many functions, such as message sending and server switching. GrpcClient is responsible for using Grpc to achieve server connection, message processing and other functions.

RpcClient

RpcClient includes a clientEventExecutor to perform client tasks, a connection event listener thread to trigger the connection event listener callback when it is started, and a reconnection event listener thread to periodically check the connectivity of the remote server and reconnection event processing. A template method, connectToServer, is then defined according to the configuration to connect to the remote server, which is handled by the concrete implementation class GrpcClient. Finally, two message handlers are registered to handle requests from the server.

The ServerList is maintained in ServerListManager, using a List to maintain the List and the currentIndex atomic variable to maintain the currentIndex. If an interface is provided to get the list of servers remotely, the list of servers is refreshed periodically in ServerListManager.

public final void start(a) throws NacosException {
    
    boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
    if(! success) {return;
    }
    
    clientEventExecutor = new ScheduledThreadPoolExecutor(2.new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.remote.worker");
            t.setDaemon(true);
            returnt; }});// connection event consumer.
    // Connect event handler
    clientEventExecutor.submit(new Runnable() {
        @Override
        public void run(a) {
            while(! clientEventExecutor.isTerminated() && ! clientEventExecutor.isShutdown()) { ConnectionEvent take =null;
                try {
                    take = eventLinkedBlockingQueue.take();
                    if (take.isConnected()) {
                        notifyConnected();
                    } else if(take.isDisConnected()) { notifyDisConnected(); }}catch (Throwable e) {
                    //Do nothing}}}});// Health check
    clientEventExecutor.submit(new Runnable() {
        @Override
        public void run(a) {
            while (true) {
                try {
                    if (isShutdown()) {
                        break;
                    }
                    ReconnectContext reconnectContext = reconnectionSignal
                            .poll(keepAliveTime, TimeUnit.MILLISECONDS);
                    if (reconnectContext == null) {
                        //check alive time.
                        if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                            boolean isHealthy = healthCheck();
                            if(! isHealthy) {if (currentConnection == null) {
                                    continue;
                                }
                                LoggerUtils.printIfInfoEnabled(LOGGER,
                                        "[{}]Server healthy check fail,currentConnection={}", name,
                                        currentConnection.getConnectionId());
                                
                                RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                                if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                    break;
                                }
                                
                                boolean success = RpcClient.this.rpcClientStatus
                                        .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
                                if (success) {
                                    reconnectContext = new ReconnectContext(null.false);
                                } else {
                                    continue; }}else {
                                lastActiveTimeStamp = System.currentTimeMillis();
                                continue; }}else {
                            continue; }}if(reconnectContext.serverInfo ! =null) {
                        //clear recommend server if server is not in server list.
                        boolean serverExist = false;
                        for (String server : getServerListFactory().getServerList()) {
                            ServerInfo serverInfo = resolveServerInfo(server);
                            if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                                serverExist = true;
                                reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                                break; }}if(! serverExist) { LoggerUtils.printIfInfoEnabled(LOGGER,"[{}] Recommend server is not in server list ,ignore recommend server {}", name,
                                    reconnectContext.serverInfo.getAddress());
                            
                            reconnectContext.serverInfo = null;
                            
                        }
                    }
                    reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
                } catch (Throwable throwable) {
                    //Do nothing}}}});//connect to server ,try to connect to server sync once, async starting if fail.
    Connection connectToServer = null;
    rpcClientStatus.set(RpcClientStatus.STARTING);
    
    int startUpRetryTimes = RETRY_TIMES;
    while (startUpRetryTimes > 0 && connectToServer == null) {
        try {
            startUpRetryTimes--;
            ServerInfo serverInfo = nextRpcServer();
            
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name,
                    serverInfo);
            // Connect to remote server
            connectToServer = connectToServer(serverInfo);
        } catch (Throwable e) {
            LoggerUtils.printIfWarnEnabled(LOGGER,
                    "[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes); }}if(connectToServer ! =null) {
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up,connectionId={}",
                name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
        this.currentConnection = connectToServer;
        rpcClientStatus.set(RpcClientStatus.RUNNING);
        eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
    } else {
        switchServerAsync();
    }
    
    registerServerRequestHandler(new ConnectResetRequestHandler());
    
    //register client detection request.
    registerServerRequestHandler(new ServerRequestHandler() {
        @Override
        public Response requestReply(Request request) {
            if (request instanceof ClientDetectionRequest) {
                return new ClientDetectionResponse();
            }
            
            return null; }}); }Copy the code

During the initial startup process, we can notice that the healthCheck function is used in the server healthCheck thread, and the healthCheck function sends probe requests to the server using the current Connection object, which is an abstract class. The Requester interface is implemented, but no implementation is given; all functionality is done in the concrete implementation class GrpcConnection. A similar design can be seen in RpcClient’s request function, which was originally defined to add fault-tolerant retry for sending remote requests, whereas the Connection implementation class only cares about the specific communication.

public Response request(Request request, long timeoutMills) throws NacosException {
    int retryTimes = 0;
    Response response = null;
    Exception exceptionThrow = null;
    long start = System.currentTimeMillis();
    while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
        boolean waitReconnect = false;
        try {
            if (this.currentConnection == null| |! isRunning()) { waitReconnect =true;
                throw new NacosException(NacosException.CLIENT_DISCONNECT,
                        "Client not connected,current status:" + rpcClientStatus.get());
            }
            response = this.currentConnection.request(request, timeoutMills);
            if (response == null) {
                throw new NacosException(SERVER_ERROR, "Unknown Exception.");
            }
            if (response instanceof ErrorResponse) {
                if (response.getErrorCode() == NacosException.UN_REGISTER) {
                    synchronized (this) {
                        waitReconnect = true;
                        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                            LoggerUtils.printIfErrorEnabled(LOGGER,
                                    "Connection is unregistered, switch server,connectionId={},request={}", currentConnection.getConnectionId(), request.getClass().getSimpleName()); switchServerAsync(); }}}throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            // return response.
            lastActiveTimeStamp = System.currentTimeMillis();
            return response;
            
        } catch (Exception e) {
            if (waitReconnect) {
                try {
                    //wait client to re connect.
                    Thread.sleep(Math.min(100, timeoutMills / 3));
                } catch (Exception exception) {
                    //Do nothing.
                }
            }
            
            LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request={}, retryTimes={},errorMessage={}",
                    request, retryTimes, e.getMessage());
            
            exceptionThrow = e;
            
        }
        retryTimes++;
        
    }
    
    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
        switchServerAsyncOnRequestFail();
    }
    
    if(exceptionThrow ! =null) {
        throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
                : new NacosException(SERVER_ERROR, exceptionThrow);
    } else {
        throw new NacosException(SERVER_ERROR, "Request fail,Unknown Error"); }}Copy the code

RpcClient registers two request handlers. All request handlers are stored in a List. Instead of getting the Request type and finding the corresponding Request handler and processing the message as on the server side, the Request is processed by traversing the Request handler and stopping the traversing process when one of the handlers does not return a null response.

protected Response handleServerRequest(final Request request) {
    
    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]receive server push request,request={},requestId={}", name,
            request.getClass().getSimpleName(), request.getRequestId());
    lastActiveTimeStamp = System.currentTimeMillis();
    for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
        try {
            Response response = serverRequestHandler.requestReply(request);
            
            if(response ! =null) {
                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]ack server push request,request={},requestId={}", name,
                        request.getClass().getSimpleName(), request.getRequestId());
                returnresponse; }}catch (Exception e) {
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]handleServerRequest:{}, errorMessage={}", name, serverRequestHandler.getClass().getName(), e.getMessage()); }}return null;
}
Copy the code

GrpcClient/GrpcConnection

RpcClient defines a functional abstraction for the basic client to communicate with the remote server, while entity classes are responsible for the implementation of the communication.

GrpcClient is responsible for establishing a connection with the remote server, initializing the Stub of the Grpc unist request remote call and the StreamObserver of the bidirectional flow, creating an object of GrpcConnection, and injecting the initialized Channel into the GrpcConnection. Then make a connection establishment request and register your own connection on the server side. It is worth noting that the BiRequestStreamStub is built on top of the RequestFutureStub’s Channel and takes advantage of HTTP2 multiplexing, so only one connection will be established between the Client and Server.

@Override
public Connection connectToServer(ServerInfo serverInfo) {
    try {
        if (grpcExecutor == null) {
            int threadNumber = ThreadUtils.getSuitableThreadCount(8);
            grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(10000),
                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d")
                            .build());
            grpcExecutor.allowCoreThreadTimeOut(true);
            
        }
        int port = serverInfo.getServerPort() + rpcPortOffset();
        RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
        if(newChannelStubTemp ! =null) {
            
            Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
            if (response == null| |! (responseinstanceof ServerCheckResponse)) {
                shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
                return null;
            }
            
            BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
                    .newStub(newChannelStubTemp.getChannel());
            GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
            grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
            
            //create stream request and bind connection event to this connection.
            StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
            
            // stream observer to send response to server
            grpcConn.setPayloadStreamObserver(payloadStreamObserver);
            grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
            grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
            //send a setup request.
            ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
            conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
            conSetupRequest.setLabels(super.getLabels());
            conSetupRequest.setAbilities(super.clientAbilities);
            conSetupRequest.setTenant(super.getTenant());
            grpcConn.sendRequest(conSetupRequest);
            //wait to register connection setup
            Thread.sleep(100L);
            return grpcConn;
        }
        return null;
    } catch (Exception e) {
        LOGGER.error("[{}]Fail to connect to server! ,error={}", GrpcClient.this.getName(), e);
    }
    return null;
}
Copy the code

In GrpcConnection, the Grpc Stub is used to complete unary request and the StreamObserver is used to complete two-way flow interaction, which are some basic Grpc code. For a brief look, GrpcConnection encapsulates synchronous requests asynchronously. Futures is used to encapsulate user-defined callback functions during invocation, and provides callback functions in thread pool and timeout Settings.

@Override
public void asyncRequest(Request request, final RequestCallBack requestCallBack) throws NacosException {
    Payload grpcRequest = GrpcUtils.convert(request);
    ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
    
    //set callback .
    Futures.addCallback(requestFuture, new FutureCallback<Payload>() {
        @Override
        public void onSuccess(@Nullable Payload grpcResponse) {
            Response response = (Response) GrpcUtils.parse(grpcResponse);
            
            if(response ! =null) {
                if (response instanceof ErrorResponse) {
                    requestCallBack.onException(new NacosException(response.getErrorCode(), response.getMessage()));
                } else{ requestCallBack.onResponse(response); }}else {
                requestCallBack.onException(new NacosException(ResponseCode.FAIL.getCode(), "response is null")); }}@Override
        public void onFailure(Throwable throwable) {
            if (throwable instanceof CancellationException) {
                requestCallBack.onException(
                        new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " milliseconds."));
            } else{ requestCallBack.onException(throwable); } } }, requestCallBack.getExecutor() ! =null ? requestCallBack.getExecutor() : this.executor);
    // set timeout future.
    ListenableFuture<Payload> payloadListenableFuture = Futures
            .withTimeout(requestFuture, requestCallBack.getTimeout(), TimeUnit.MILLISECONDS,
                    RpcScheduledExecutor.TIMEOUT_SCHEDULER);
    
}
Copy the code