The remote invocation

InvokerInvocationHandler.java

public InvokerInvocationHandler(Invoker
        handler) {
    this.invoker = handler;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // Get the method name of the RPC remote callString methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();// If the current method is an Object method, it is a local method call
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }
    // Remote call. The MockClusterInvoker is created here.
    MockClusterInvoker is created from createProxy() in referenceconfig.java in the last source code analysis
    // invoker = CLUSTER.join(new StaticDirectory(u, invokers)); This line of code creates it
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
Copy the code

MockClusterInvoker.java

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // Get mock property values
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    // If the mock attribute is not set or the value of the mock attribute is false, the degrade function is not enabled
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        // No mock remote call (no degradation)
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock directly
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {  // Remote call
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
            if (logger.isWarnEnabled()) {
                logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
            }
            // The service is degradedresult = doMockInvoke(invocation, e); }}return result;
}
Copy the code

AbstractClusterInvoker.java

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if(contextAttachments ! =null&& contextAttachments.size() ! =0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // Routing: Filter out invokers that are not available according to routing rules and return the remaining invokers that are available
    List<Invoker<T>> invokers = list(invocation);
    // Obtain the load balancing policy
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // Invoke doInvoke() with a specific fault-tolerant policy
    return doInvoke(invocation, invokers, loadbalance);
}
Copy the code

FailoverClusterInvoker.java

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            } // Load balancing
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try { // Remote call
                Result result = invoker.invoke(invocation);
                if(le ! =null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + "(" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally{ providers.add(invoker.getUrl().getAddress()); }}throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + "(" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "+ le.getMessage(), le.getCause() ! =null ? le.getCause() : le);
    }
Copy the code

ProtocolFilterWrapper.java

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = filterInvoker.invoke(invocation);  // Returns an asynchronous result

    asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
        for (int i = filters.size() - 1; i >= 0; i--) {
            Filter filter = filters.get(i);
            // onResponse callback
            if (filter instanceof ListenableFilter) {
                Filter.Listener listener = ((ListenableFilter) filter).listener();
                if(listener ! =null) {
                    if (t == null) {
                        listener.onResponse(r, filterInvoker, invocation);
                    } else{ listener.onError(t, filterInvoker, invocation); }}}else{ filter.onResponse(r, filterInvoker, invocation); }}});return asyncResult;
}

 public Result invoke(Invocation invocation) throws RpcException {
     Result asyncResult;
     try {
         // Continue the chase here
         asyncResult = filter.invoke(next, invocation);
     } catch (Exception e) {
         // onError callback
         if (filter instanceof ListenableFilter) {
             Filter.Listener listener = ((ListenableFilter) filter).listener();
             if(listener ! =null) { listener.onError(e, invoker, invocation); }}throw e;
     }
     return asyncResult;
 }
Copy the code

MonitorFilter.java

public Result invoke(Invoker
        invoker, Invocation invocation) throws RpcException {
    if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
        invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
    }
    return invoker.invoke(invocation); // proceed invocation chain
}
Copy the code

ListenerInvokerWrapper.java

public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}
Copy the code

AsyncToSyncInvoker.java

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);

    try {
        if(InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); }}catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: "+ e.getMessage(), e); }}catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}
Copy the code

AbstractInvoker.java

public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (CollectionUtils.isNotEmptyMap(attachment)) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        invocation.addAttachments(contextAttachments);
    }

    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    try {
        / / here
        return doInvoke(invocation);  //
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); }}catch (RpcException e) {
        if (e.isBiz()) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            throwe; }}catch (Throwable e) {
        return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); }}Copy the code

DubboInvoker.java

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    // Clients is an ExchangeClient array. If there is only one ExchangeClient, return one; otherwise, polling returns one
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {  // Polling returns an exchangeClient
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {  // If a request does not require a response, isOneway is true, indicating one-way
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            // Send the request through the synchronous conversion object currentClient
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            asyncRpcResult.subscribeTo(responseFuture);
            / / save the for 2.6 x compatibility, for example, TraceFilter in Zipkin USES com. Alibaba. XXX. FutureAdapter
            FutureContext.getContext().setCompatibleFuture(responseFuture);
            returnasyncRpcResult; }}catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

ReferenceCountExchangeClient.java

public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    return client.request(request, timeout);
}
Copy the code

HeaderExchangeClient.java

public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    return channel.request(request, timeout);
}
Copy the code

HeaderExchangeChannel.java

public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null."Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        channel.send(req);   // Send the request
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
Copy the code

AbstractPeer.java

public void send(Object message) throws RemotingException {
    send(message, url.getParameter(Constants.SENT_KEY, false));
}
Copy the code

AbstractClient.java

public void send(Object message, boolean sent) throws RemotingException {
    if(needReconnect && ! isConnected()) { connect();/ / Server connection
    }
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null| |! channel.isConnected()) {throw new RemotingException(this."message can not send, because channel is closed . url:" + getUrl());
    }
    channel.send(message, sent);  // Send the request
}
Copy the code

NettyChannel.java

public void send(Object message, boolean sent) throws RemotingException {
    // whether the channel is closed
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        ChannelFuture future = channel.writeAndFlush(message);   // Send data to the Server
        if (sent) {
            // wait timeout ms
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if(cause ! =null) {
            throwcause; }}catch (Throwable e) {
        throw new RemotingException(this."Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if(! success) {throw new RemotingException(this."Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit"); }}Copy the code

The provider processes the consumer request

NettyServer.java

protected void  doOpen(a) throws Throwable {
    bootstrap = new ServerBootstrap();
    // Process the Group of connections
    bossGroup = new NioEventLoopGroup(1.new DefaultThreadFactory("NettyServerBoss".true));
    // Group that processes IO services
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker".true));
    // The provider URL
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            / / decoder
                            .addLast("decoder", adapter.getDecoder())
                            / / encoder
                            .addLast("encoder", adapter.getEncoder())
                            // Read/write idle monitor
                            .addLast("server-idle-handler".new IdleStateHandler(0.0, idleTimeout, MILLISECONDS))
                            // Dubbo's custom processor
                            .addLast("handler", nettyServerHandler); }});// bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
Copy the code

NettyServerHandler.java

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
         // This is the Netty layer.
        handler.received(channel, msg);
    } finally{ NettyChannel.removeChannelIfDisconnected(ctx.channel()); }}Copy the code

AllChannelHandler.java

public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService executor = getExecutorService();  / / thread pool
    try {  // Wrap the data sent by the Client as a task to be completed using threads from the thread pool
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
        //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
       if(message instanceof Request && t instanceof RejectedExecutionException){
          Request request = (Request)message;
          if(request.isTwoWay()){
             String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
             Response response = new Response(request.getId(), request.getVersion());
             response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
             response.setErrorMessage(msg);
             channel.send(response);
             return; }}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); }}Copy the code

ChannelEventRunnable.java

public void run(a) {
    // The processing task is received
    if (state == ChannelState.RECEIVED) {
        try {
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is "+ message, e); }}... }Copy the code

DecodeHandler.java

// The data is then decoded, parsed and processed by the consumer
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);  // Decode the message
    }

    if (message instanceof Request) {
        decode(((Request) message).getData());   / / decoding RpcInvocation
    }

    if (message instanceof Response) {
        decode(((Response) message).getResult());  // Decode the response result
    }
    // The message is decoded
    handler.received(channel, message);
}
Copy the code

HeaderExchangeHandler.java

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {  // Handle the case where message is a request
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    handleRequest(exchangeChannel, request);  // Process the request
                } else{ handler.received(exchangeChannel, request.getData()); }}}else if (message instanceof Response) {   // Handle the case where message is the response
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if(echo ! =null && echo.length() > 0) { channel.send(echo); }}}else{ handler.received(exchangeChannel, message); }}finally{ HeaderExchangeChannel.removeChannelIfDisconnected(channel); }}void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {  // If an exception occurs during the request, it is handled here
            Object data = req.getData();

            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);

            channel.send(res);  // Return a response containing exception information
            return;
        }
        // find handler by message class.
        Object msg = req.getData();  / / get RpcInvocation
        try {
            CompletionStage<Object> future = handler.reply(channel, msg);
            future.whenComplete((appResult, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(appResult);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);  // Return the actual response result
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                } finally {
                    // HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}); }catch(Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); }}Copy the code

DubboProtocol.java

@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    if(! (messageinstanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ":" + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: "+ channel.getLocalAddress()); } Invocation inv = (Invocation) message; Invoker<? > invoker = getInvoker(channel, inv);// Get invoker
    // need to consider backward-compatibility if it's a callback
    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null| |! methodsStr.contains(",")) {
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            String[] methods = methodsStr.split(",");
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break; }}}if(! hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    Result result = invoker.invoke(inv);  // Make a local call and get a Result
    return result.completionFuture().thenApply(Function.identity());
}
Copy the code

The consumer handles the provider response

NettyClientHandler.java

// The consumer receives the provider's response and first creates a NettyClinet. Once created, the channelRead method is executed, which is where we start
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.received(channel, msg);
    } finally{ NettyChannel.removeChannelIfDisconnected(ctx.channel()); }}Copy the code

AbstractPeer.java

public void received(Channel ch, Object msg) throws RemotingException {
    if (closed) {
        return;
    }
    handler.received(ch, msg);
}
Copy the code

MultiMessageHandler.java

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for(Object obj : list) { handler.received(channel, obj); }}else {
        handler.received(channel, message);  //}}Copy the code

HeartbeatHandler.java

public void received(Channel channel, Object message) throws RemotingException {
    setReadTimestamp(channel);
    if (isHeartbeatRequest(message)) {  // Check whether message is the heartbeat of the client
        Request req = (Request) message;
        if (req.isTwoWay()) {
            Response res = new Response(req.getId(), req.getVersion());
            res.setEvent(Response.HEARTBEAT_EVENT);
            channel.send(res);
            if (logger.isInfoEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                if (logger.isDebugEnabled()) {
                    logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                            + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                            + (heartbeat > 0 ? ":" + heartbeat + "ms" : "")); }}}return;
    }
    if (isHeartbeatResponse(message)) {  // Determine whether the current message is a heartbeat response from the Server
        if (logger.isDebugEnabled()) {
            logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
        }
        return;
    }
    handler.received(channel, message);  // Message is plain data
}
Copy the code

AllChannelHandler.java

public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService executor = getExecutorService();  / / thread pool
    try {  // Wrap the data sent by the Client as a task to be completed using threads from the thread pool
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
        //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
       if(message instanceof Request && t instanceof RejectedExecutionException){
          Request request = (Request)message;
          if(request.isTwoWay()){
             String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
             Response response = new Response(request.getId(), request.getVersion());
             response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
             response.setErrorMessage(msg);
             channel.send(response);
             return; }}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); }}Copy the code

ChannelEventRunnable.java

public void run(a) {
    // The processing task is received
    if (state == ChannelState.RECEIVED) {
        try {
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is "+ message, e); }}... }Copy the code

DecodeHandler.java

// Do data decoding
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);  // Decode the message
    }

    if (message instanceof Request) {
        decode(((Request) message).getData());   / / decoding RpcInvocation
    }

    if (message instanceof Response) {
        decode(((Response) message).getResult());  // Decode the response result
    }
    // The message is decoded
    handler.received(channel, message);
}
Copy the code

HeaderExchangeHandler.java

@Override
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {  // Handle the case where message is a request
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    handleRequest(exchangeChannel, request);  // Process the request
                } else{ handler.received(exchangeChannel, request.getData()); }}}else if (message instanceof Response) {   // Handle the case where message is the response
            // Trace here
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if(echo ! =null && echo.length() > 0) { channel.send(echo); }}}else{ handler.received(exchangeChannel, message); }}finally{ HeaderExchangeChannel.removeChannelIfDisconnected(channel); }}static void handleResponse(Channel channel, Response response) throws RemotingException {
    if(response ! =null&&! response.isHeartbeat()) { DefaultFuture.received(channel, response); }}Copy the code

DefaultFuture.java

public static void received(Channel channel, Response response) {
    received(channel, response, false);
}

 public static void received(Channel channel, Response response, boolean timeout) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if(future ! =null) {
                Timeout t = future.timeoutCheckTask;
                if(! timeout) {// decrease Time
                    t.cancel();
                }
                / / here
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + "- >"+ channel.getRemoteAddress())); }}finally{ CHANNELS.remove(response.getId()); }}private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        // Get the response result and return it layer by layer
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(newRemotingException(channel, res.getErrorMessage())); }}Copy the code

CompletableFuture.java

public boolean complete(T value) {
    boolean triggered = completeValue(value);
    postComplete();
    return triggered;
}
Copy the code

ProtocolFilterWrapper.java

// Finally come back here
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = filterInvoker.invoke(invocation);  // Returns an asynchronous result

    asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
        for (int i = filters.size() - 1; i >= 0; i--) {
            Filter filter = filters.get(i);
            // onResponse callback
            if (filter instanceof ListenableFilter) {
                Filter.Listener listener = ((ListenableFilter) filter).listener();
                if(listener ! =null) {
                    if (t == null) {
                        listener.onResponse(r, filterInvoker, invocation);
                    } else{ listener.onError(t, filterInvoker, invocation); }}}else{ filter.onResponse(r, filterInvoker, invocation); }}});return asyncResult;
}
Copy the code