Git address: github.com/sofastack/s…
SOFABolt is a network communication framework developed by Ant Financial Services Group based on Netty implementation. Compared to Netty, SOFABolt is the difference between an automatic transmission and a manual transmission car. SOFABolt provides the basic communication model for reuse:
- Oneway One-way call
- InvokeSync synchronous call
- InvokeWithFuture Asynchronous Future invocation
- InvokeWithCallback Asynchronous callback
Let’s take a closer look at the implementation of each pattern
A, oneway
This mode is the simplest. It is one-way and does not wait for a response from the server. It is usually used to send heartbeats
protected void oneway(final Connection conn, RemotingCommand request) { try { conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture f) throws Exception { if (! f.isSuccess()) { BaseRemoting.logger.error("Invoke send failed. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), f.cause()); }}}); } catch (Exception var4) { } }Copy the code
Second, the invokeSync
We know that Netty is an asynchronous communication framework that supports asynchronous synchronization through countDownLatch
com.alipay.remoting.BaseRemoting#invokeSync
protected RemotingCommand invokeSync(final Connection conn, RemotingCommand request, int timeoutMillis) throws RemotingException, InterruptedException {
final InvokeFuture future = this.createInvokeFuture(request, request.getInvokeContext());
conn.addInvokeFuture(future);
final int requestId = request.getId();
try {
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { // @1
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
conn.removeInvokeFuture(requestId)
future.putResponse(BaseRemoting.this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), f.cause())); //@2
BaseRemoting.logger.error("Invoke send failed, id={}", requestId, f.cause());
}
}
});
} catch (Exception var7) {
conn.removeInvokeFuture(requestId);
future.putResponse(this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), var7)); // @3
logger.error("Exception caught when sending invocation, id={}", requestId, var7);
}
RemotingCommand response = future.waitResponse((long)timeoutMillis); // @4
if (response == null) {
conn.removeInvokeFuture(requestId);
response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
logger.warn("Wait response, request id={} timeout!", requestId);
}
return response;
}
Copy the code
Code @1: Send an asynchronous request
@2, @3: If the server fails to respond or the call is abnormal, future.putresponse will execute countdownlatch.countdown ();
Code @4: The main thread countdownlatch.await the server response
The server returned normally. Where was the putResponse triggered?
com.alipay.remoting.rpc.protocol.RpcResponseProcessor#doProcess
public void doProcess(RemotingContext ctx, RemotingCommand cmd) { Connection conn = (Connection)ctx.getChannelContext().channel().attr(Connection.CONNECTION).get(); InvokeFuture future = conn.removeInvokeFuture(cmd.getId()); // @1 ClassLoader oldClassLoader = null; try { if (future ! = null) { if (future.getAppClassLoader() ! = null) { oldClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(future.getAppClassLoader()); } future.putResponse(cmd); // @2 future.cancelTimeout(); try { future.executeInvokeCallback(); // @3 } catch (Exception var10) { logger.error("Exception caught when executing invoke callback, id={}", cmd.getId(), var10); } } else { logger.warn("Cannot find InvokeFuture, maybe already timeout, id={}, from={} ", cmd.getId(), RemotingUtil.parseRemoteAddress(ctx.getChannelContext().channel())); } } finally { } }Copy the code
Code @1: Get the InvokeFuture from invokeFutureMap
Code @2: Countdownlatch.countdown ()
@3: Call the callback function
Third, invokeWithFuture
public RpcResponseFuture invokeWithFuture(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) throws RemotingException { RemotingCommand requestCommand = this.toRemotingCommand(request, conn, invokeContext, timeoutMillis); this.preProcessInvokeContext(invokeContext, requestCommand, conn); InvokeFuture future = super.invokeWithFuture(conn, requestCommand, timeoutMillis); return new RpcResponseFuture(RemotingUtil.parseRemoteAddress(conn.getChannel()), future); / / @ 1}Copy the code
Code @1: Encapsulates the Future as an RpcResponseFuture
protected InvokeFuture invokeWithFuture(final Connection conn, RemotingCommand request, int timeoutMillis) { InvokeFuture future = this.createInvokeFuture(request, request.getInvokeContext()); conn.addInvokeFuture(future); //@1 final int requestId = request.getId(); try { Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() { // @2 public void run(Timeout timeout) throws Exception { InvokeFuture future = conn.removeInvokeFuture(requestId); if (future ! = null) { future.putResponse(BaseRemoting.this.commandFactory.createTimeoutResponse(conn.getRemoteAddress())); } } }, (long)timeoutMillis, TimeUnit.MILLISECONDS); future.addTimeout(timeout); conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { //@3 public void operationComplete(ChannelFuture cf) throws Exception { if (! cf.isSuccess()) { InvokeFuture f = conn.removeInvokeFuture(requestId); if (f ! = null) { f.cancelTimeout(); f.putResponse(BaseRemoting.this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), cf.cause())); } BaseRemoting.logger.error("Invoke send failed. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), cf.cause()); }}}); } catch (Exception var8) { InvokeFuture f = conn.removeInvokeFuture(requestId); if (f ! = null) { f.cancelTimeout(); f.putResponse(this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), var8)); } logger.error("Exception caught when sending invocation. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), var8); } return future; }Copy the code
Code @1: Build the InvokeFuture from request and put it in invokeFutureMap
Code @2: Start the delayed task using HashedWheelTimer, timeout removes the InvokeFuture from the invokeFutureMap, and releases countDownLatch
Code @3: Send an asynchronous request
The rpCresponseFuture.get () method simply calls waitResponse and blocks until the value is returned
public Object get(int timeoutMillis) throws InvokeTimeoutException, RemotingException, InterruptedException { this.future.waitResponse((long)timeoutMillis); if (! this.isDone()) { throw new InvokeTimeoutException("Future get result timeout!" ); } else { ResponseCommand responseCommand = (ResponseCommand)this.future.waitResponse(); responseCommand.setInvokeContext(this.future.getInvokeContext()); return RpcResponseResolver.resolveResponseObject(responseCommand, this.addr); }}Copy the code
Four, invokeWithCallback
protected void invokeWithCallback(final Connection conn, RemotingCommand request, InvokeCallback invokeCallback, int timeoutMillis) { InvokeFuture future = this.createInvokeFuture(conn, request, request.getInvokeContext(), invokeCallback); conn.addInvokeFuture(future); final int requestId = request.getId(); try { Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { InvokeFuture future = conn.removeInvokeFuture(requestId); if (future ! = null) { future.putResponse(BaseRemoting.this.commandFactory.createTimeoutResponse(conn.getRemoteAddress())); future.tryAsyncExecuteInvokeCallbackAbnormally(); // @1 } } }, (long)timeoutMillis, TimeUnit.MILLISECONDS); future.addTimeout(timeout); conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture cf) throws Exception { if (! cf.isSuccess()) { InvokeFuture f = conn.removeInvokeFuture(requestId); if (f ! = null) { f.cancelTimeout(); f.putResponse(BaseRemoting.this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), cf.cause())); f.tryAsyncExecuteInvokeCallbackAbnormally(); } BaseRemoting.logger.error("Invoke send failed. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), cf.cause()); }}}); } catch (Exception var9) { InvokeFuture f = conn.removeInvokeFuture(requestId); if (f ! = null) { f.cancelTimeout(); f.putResponse(this.commandFactory.createSendFailedResponse(conn.getRemoteAddress(), var9)); f.tryAsyncExecuteInvokeCallbackAbnormally(); } logger.error("Exception caught when sending invocation. The address is {}", RemotingUtil.parseRemoteAddress(conn.getChannel()), var9); }}Copy the code
@1: The Future calls the callback asynchronously after the timeout, as shown below
public void tryAsyncExecuteInvokeCallbackAbnormally() { try { Protocol protocol = ProtocolManager.getProtocol(ProtocolCode.fromBytes(new byte[]{this.protocol})); if (null ! = protocol) { CommandHandler commandHandler = protocol.getCommandHandler(); if (null ! = commandHandler) { ExecutorService executor = commandHandler.getDefaultExecutor(); if (null ! = executor) { executor.execute(new Runnable() { public void run() { ClassLoader oldClassLoader = null; try { if (DefaultInvokeFuture.this.getAppClassLoader() ! = null) { oldClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(DefaultInvokeFuture.this.getAppClassLoader()); } DefaultInvokeFuture.this.executeInvokeCallback(); } finally { if (null ! = oldClassLoader) { Thread.currentThread().setContextClassLoader(oldClassLoader); }}}}); } } else { logger.error("Executor null in commandHandler of protocolCode [{}].", this.protocol); } } else { logger.error("protocolCode [{}] not registered!" , this.protocol); } } catch (Exception var4) { logger.error("Exception caught when executing invoke callback abnormally.", var4); }}Copy the code
Conclusion:
These are the four communication models in SolT, which we see in Alibaba’s open source component: RocketMQ/Seata. Users can use our built-in RPC communication protocol without worrying about the details of how to implement a private protocol.