Git address: github.com/sofastack/s…

SOFABolt is a network communication framework developed by Ant Financial Services Group based on Netty implementation. Compared to Netty, SOFABolt is the difference between an automatic transmission and a manual transmission car. SOFABolt provides the basic communication model for reuse:

  • Oneway One-way call
  • InvokeSync synchronous call
  • InvokeWithFuture Asynchronous Future invocation
  • InvokeWithCallback Asynchronous callback

Let’s take a closer look at the implementation of each pattern

A, oneway

This mode is the simplest. It is one-way and does not wait for a response from the server. It is usually used to send heartbeats

protected void oneway(final Connection conn, RemotingCommand request) { try { conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture f) throws Exception { if (! f.isSuccess()) { BaseRemoting.logger.error("Invoke send failed. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), f.cause()); }}}); } catch (Exception var4) { } }Copy the code

Second, the invokeSync

We know that Netty is an asynchronous communication framework that supports asynchronous synchronization through countDownLatch

com.alipay.remoting.BaseRemoting#invokeSync

protected RemotingCommand invokeSync(final Connection conn, RemotingCommand request, int timeoutMillis) throws RemotingException, InterruptedException {
        final InvokeFuture future = this.createInvokeFuture(request, request.getInvokeContext());
        conn.addInvokeFuture(future);
        final int requestId = request.getId();
        try {
            conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { // @1
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (!f.isSuccess()) {
                        conn.removeInvokeFuture(requestId)
                        future.putResponse(BaseRemoting.this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), f.cause())); //@2
                        BaseRemoting.logger.error("Invoke send failed, id={}", requestId, f.cause());
                    }

                }
            });
        } catch (Exception var7) {
            conn.removeInvokeFuture(requestId);
            future.putResponse(this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), var7)); // @3
            logger.error("Exception caught when sending invocation, id={}", requestId, var7);
        }

        RemotingCommand response = future.waitResponse((long)timeoutMillis); // @4
        if (response == null) {
            conn.removeInvokeFuture(requestId);
            response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
            logger.warn("Wait response, request id={} timeout!", requestId);
        }
        return response;
    }
Copy the code

Code @1: Send an asynchronous request

@2, @3: If the server fails to respond or the call is abnormal, future.putresponse will execute countdownlatch.countdown ();

Code @4: The main thread countdownlatch.await the server response

The server returned normally. Where was the putResponse triggered?

com.alipay.remoting.rpc.protocol.RpcResponseProcessor#doProcess

public void doProcess(RemotingContext ctx, RemotingCommand cmd) { Connection conn = (Connection)ctx.getChannelContext().channel().attr(Connection.CONNECTION).get(); InvokeFuture future = conn.removeInvokeFuture(cmd.getId()); // @1 ClassLoader oldClassLoader = null; try { if (future ! = null) { if (future.getAppClassLoader() ! = null) { oldClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(future.getAppClassLoader()); } future.putResponse(cmd); // @2 future.cancelTimeout(); try { future.executeInvokeCallback(); // @3 } catch (Exception var10) { logger.error("Exception caught when executing invoke callback, id={}", cmd.getId(), var10); } } else { logger.warn("Cannot find InvokeFuture, maybe already timeout, id={}, from={} ", cmd.getId(), RemotingUtil.parseRemoteAddress(ctx.getChannelContext().channel())); } } finally { } }Copy the code

Code @1: Get the InvokeFuture from invokeFutureMap

Code @2: Countdownlatch.countdown ()

@3: Call the callback function

Third, invokeWithFuture

public RpcResponseFuture invokeWithFuture(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) throws RemotingException { RemotingCommand requestCommand = this.toRemotingCommand(request, conn, invokeContext, timeoutMillis); this.preProcessInvokeContext(invokeContext, requestCommand, conn); InvokeFuture future = super.invokeWithFuture(conn, requestCommand, timeoutMillis); return new RpcResponseFuture(RemotingUtil.parseRemoteAddress(conn.getChannel()), future); / / @ 1}Copy the code

Code @1: Encapsulates the Future as an RpcResponseFuture

protected InvokeFuture invokeWithFuture(final Connection conn, RemotingCommand request, int timeoutMillis) { InvokeFuture future = this.createInvokeFuture(request, request.getInvokeContext()); conn.addInvokeFuture(future); //@1 final int requestId = request.getId(); try { Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() { // @2 public void run(Timeout timeout) throws Exception { InvokeFuture future = conn.removeInvokeFuture(requestId); if (future ! = null) { future.putResponse(BaseRemoting.this.commandFactory.createTimeoutResponse(conn.getRemoteAddress())); } } }, (long)timeoutMillis, TimeUnit.MILLISECONDS); future.addTimeout(timeout); conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { //@3 public void operationComplete(ChannelFuture cf) throws Exception { if (! cf.isSuccess()) { InvokeFuture f = conn.removeInvokeFuture(requestId); if (f ! = null) { f.cancelTimeout(); f.putResponse(BaseRemoting.this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), cf.cause())); } BaseRemoting.logger.error("Invoke send failed. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), cf.cause()); }}}); } catch (Exception var8) { InvokeFuture f = conn.removeInvokeFuture(requestId); if (f ! = null) { f.cancelTimeout(); f.putResponse(this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), var8)); } logger.error("Exception caught when sending invocation. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), var8); } return future; }Copy the code

Code @1: Build the InvokeFuture from request and put it in invokeFutureMap

Code @2: Start the delayed task using HashedWheelTimer, timeout removes the InvokeFuture from the invokeFutureMap, and releases countDownLatch

Code @3: Send an asynchronous request

The rpCresponseFuture.get () method simply calls waitResponse and blocks until the value is returned

public Object get(int timeoutMillis) throws InvokeTimeoutException, RemotingException, InterruptedException { this.future.waitResponse((long)timeoutMillis); if (! this.isDone()) { throw new InvokeTimeoutException("Future get result timeout!" ); } else { ResponseCommand responseCommand = (ResponseCommand)this.future.waitResponse(); responseCommand.setInvokeContext(this.future.getInvokeContext()); return RpcResponseResolver.resolveResponseObject(responseCommand, this.addr); }}Copy the code

Four, invokeWithCallback

protected void invokeWithCallback(final Connection conn, RemotingCommand request, InvokeCallback invokeCallback, int timeoutMillis) { InvokeFuture future = this.createInvokeFuture(conn, request, request.getInvokeContext(), invokeCallback); conn.addInvokeFuture(future); final int requestId = request.getId(); try { Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { InvokeFuture future = conn.removeInvokeFuture(requestId); if (future ! = null) { future.putResponse(BaseRemoting.this.commandFactory.createTimeoutResponse(conn.getRemoteAddress())); future.tryAsyncExecuteInvokeCallbackAbnormally(); // @1 } } }, (long)timeoutMillis, TimeUnit.MILLISECONDS); future.addTimeout(timeout); conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture cf) throws Exception { if (! cf.isSuccess()) { InvokeFuture f = conn.removeInvokeFuture(requestId); if (f ! = null) { f.cancelTimeout(); f.putResponse(BaseRemoting.this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), cf.cause())); f.tryAsyncExecuteInvokeCallbackAbnormally(); } BaseRemoting.logger.error("Invoke send failed. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), cf.cause()); }}}); } catch (Exception var9) { InvokeFuture f = conn.removeInvokeFuture(requestId); if (f ! = null) { f.cancelTimeout(); f.putResponse(this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), var9)); f.tryAsyncExecuteInvokeCallbackAbnormally(); } logger.error("Exception caught when sending invocation. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), var9); }}Copy the code

@1: The Future calls the callback asynchronously after the timeout, as shown below

public void tryAsyncExecuteInvokeCallbackAbnormally() { try { Protocol protocol = ProtocolManager.getProtocol(ProtocolCode.fromBytes(new byte[]{this.protocol})); if (null ! = protocol) { CommandHandler commandHandler = protocol.getCommandHandler(); if (null ! = commandHandler) { ExecutorService executor = commandHandler.getDefaultExecutor(); if (null ! = executor) { executor.execute(new Runnable() { public void run() { ClassLoader oldClassLoader = null; try { if (DefaultInvokeFuture.this.getAppClassLoader() ! = null) { oldClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(DefaultInvokeFuture.this.getAppClassLoader()); } DefaultInvokeFuture.this.executeInvokeCallback(); } finally { if (null ! = oldClassLoader) { Thread.currentThread().setContextClassLoader(oldClassLoader); }}}}); } } else { logger.error("Executor null in commandHandler of protocolCode [{}].", this.protocol); } } else { logger.error("protocolCode [{}] not registered!" , this.protocol); } } catch (Exception var4) { logger.error("Exception caught when executing invoke callback abnormally.", var4); }}Copy the code

Conclusion:

These are the four communication models in SolT, which we see in Alibaba’s open source component: RocketMQ/Seata. Users can use our built-in RPC communication protocol without worrying about the details of how to implement a private protocol.