preface

SOFABolt is a network communication framework developed by Ant Financial Services Group based on Netty implementation.

  • In order to enable Java programmers to pay more attention to the implementation of business logic based on network communication, rather than too much entanglement in the implementation of NIO at the bottom of the network and deal with difficult to debug network problems, Netty came into being.
  • SOFABolt was created to allow middleware developers to focus more on product features rather than building the wheels of a communication framework over and over again.

Named after the Disney animated feature Lightning Dog, Bolt is a lightweight, easy-to-use, high-performance, and easily scalable communications framework based on Netty best practices. Ant Financial Services Group has solved many problems and accumulated a lot of experience in micro-services and messaging middleware in network communication over the years, and continues to optimize and improve it, hoping to accumulate the summarized solutions into SOFABolt, the basic component, so that more scenarios using network communication can benefit uniformly. At present, this product has been applied in many products of Ant middleware, such as SOFARPC, message center, distributed transaction, distributed switch, and configuration center.

Debugging Environment Setup

Depend on the tool

  • Maven
  • Git
  • JDK
  • IntelliJ IDEA

Source pull

From the official Fork out their own warehouse, warehouse https://github.com/alipay/sofa-bolt why Fork? Since start reading, debugging the source code, we may write some comments, have your own warehouse, can be free to submit. :smiling_imp:

Pull code from the repository Fork using IntelliJ IDEA.

Example module

In the Test module, several Bolt usage examples are provided on the website.

We provide an RpcClient with RpcServer, after simple necessary function initialization, or function switch, can be used.

RpcServer

Implement com. Alipay. The remoting. Demo. RpcServerDemoByMain # main (args) method, start the server. The following logs are displayed:

Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j2 ]
server start ok!
Copy the code

RpcClient

Implement com. Alipay. The remoting. Demo. RpcClientDemoByMain # main (args) method, start the server. The following logs are displayed:

Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j2 ]
invoke sync result = [HELLO WORLD! I'm server return]
Copy the code

In this way, we can have a pleasant Netty debugging. Read source code, be sure to debug source code. Very important!! :imp:

Private communication protocol design

Figure 1 – Proprietary protocols and necessary functional modules

Protocol

The field name Byte range note
proto 1 byte The magic number of the agreement
ver1 1 byte Protocol version
type 1 byte (1)request (2)response (3) request oneway
cmdcode 2 – Remote command code
ver2 1 byte Remote command version
requestId 4 bytes Request ID
codec 1 byte Serialization code
switch 1 byte Protocol function switch
The timeout or respstatus 4 bytes or 2 bytes Request timeout or reply status
classLen 2 – The length of the request or response class name
headerLen 2 – Protocol Header length
contentLen 4 bytes Length of Agreement
content N bytes content
CRC32(optional) 4 bytes CRC32 for frames (present when ver1> 1)

In Bolt communication framework, there are two protocol specifications. Due to design errors, the protocol version RpcProtocol is discarded. The following is interpreted as RpcProtocolV2.

  1. First, the first field is the magic number, which is usually a fixed number of bytes (in our case, 1 byte). Why do I need this field, and it’s a fixed number? Suppose we open a port on the server, such as port 80. If there is no such magic number, the server will process any packets that are sent to the server according to the custom protocol. For example, let’s go straight throughhttp://server IP addressTo access the server (port 80 by default), the server receives a standard HTTP packet, but it still handles the HTTP protocol according to the pre-agreed protocol, which is obviously parsing error. With this magic number, the server first takes out the first four bytes for comparison, and can immediately identify that the packet does not follow the custom protocol, that is, invalid packet. For security reasons, it can directly close the connection to save resources. In a Java bytecode binary, the first byte is(byte) 2Used to indicate that this is a bytecode file, is also a good idea.
  2. The next byte is the version number, which is usually reserved for protocol upgrade. It is similar to a TCP field that identifies IPV4 or IPV6. The first byte is byte 1, and the second byte is byte 2.
  3. The third part, type, indicates whether the Rpc type is a request or reply command. The request commands are divided into request_oneway and request. Request_oneway represents a simplex, that is, only requests are made without replies. Request is the general request-response model.
  4. The fourth part is the remote command code, the remote command code represents a specific type of remote command, each command has its own number. Bolt, (short) 0 is occupied by heartbeat and cannot be used by other commands.
  5. The fifth part is the remote command code version, which has the same function as the protocol version. It is reserved for the remote command version upgrade.
  6. The sixth part is divided into request number,
  7. The seventh part is the serialization code. Although the field is labeled codec, it actually means Serializer, which means different things. Serializer is primarily used to deserialize bytes into objects or objects into bytes. We can use Hessian, JSON, Protocol buff, etc. The default serialization is Hessian2.
  8. The eighth part is function switch, which can decide whether to encode or decode this position by turning on or off some functions of communication protocol. For example, by judging whether the CRC function of protocol is enabled, it can judge whether to carry out cyclic redundancy check on content.
  9. The ninth part is timeout or respStatus, which is timout(timeout) in Rpc request commands and respStatus (reply status) in Rpc reply commands. The response status includes SUCCESS,ERROR,SERVER_EXCEPTION,TIMEOUT, and so on.
  10. The tenth part is classLen headerLen, contentLen. These fields represent the length of the class, header, and content in the payload.
  11. CRC32(optional), the last field is optional, and the protocol switch determines whether to perform cyclic redundancy check on the content.

The Encoder and Decoder

Protocol related encoding and decoding methods: Private protocols need core encode and decode processes, and can support different serialization and deserialization mechanisms for business load. This part, different private protocols, due to the differences in fields, the core encode and decode processes are not the same, so it needs to be considered separately.

Encoder

First of all, let’s take a look at code realization, source code path com. Alipay. The remoting.. RPC protocol. RpcCommandEncoderV2, code is as follows:

/**
 * Encode remoting commandInto ByteBuf v2. * Encode remote commands into ByteBuf version 2 * * @author jiangping * @version$Id: RpcCommandEncoderV2.java, V 0.1 2017-05-27 pm8:11:27 tao Exp $*/ public class RpcCommandEncoderV2 implements CommandEncoder {/** Logger log */ private static final Logger logger = LoggerFactory.getLogger("RpcRemoting");

    /**
     * @see CommandEncoder#encode(ChannelHandlerContext, Serializable, ByteBuf)
     */
    @Override
    public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
        try {
            if (msg instanceof RpcCommand) {
                /*
                 * proto: magic code forProtocol Magic number of the protocol * ver: versionforProtocol Protocol version *type: request/response/request oneway Rpc command type * cmdcode: codefor remoting commandRemote command code * ver2:versionfor remoting commandRemote command version * requestId: ID of Request Request number * codec: codeforCodec serialization code * switch:function* (req)timeout: request timeout. When the command type is request, this position is timeout,4 bytes * (RESp)respStatus: Response Status When the command type is reply, this position is reply status,2 bytes * classLen: Length of Request or Response Class Name Length of request and response classes * headerLen: length of header header length * cotentLen: Length of content content length * className className * header protocol * content * CRC32 of CRC (optional) frames (exist when ver1 > 1) */ int index = out.writerIndex(); CMD = (RpcCommand) MSG; // Write version magic (byte) 2 out.writeByte(rpcProtocolv2.protocol_code); // Obtain the protocol version Attribute from the Connection Attribute<Byte> version = ctx.channel().attr(connection.version); byte ver = RpcProtocolV2.PROTOCOL_VERSION_1;if(version ! = null && version.get() ! = null) { ver = version.get(); } // Write protocol version out.writebyte (ver); WriteByte (cmd.getType()); // Write RPC type code out.writebyte (cmd.getType()); Out.writeshort (((RpcCommand) MSG).getCMdCode ().value())); // Write the remote command version out.writebyte (cmd.getversion ()); // Write the Rpc number out.writeint (cmd.getid ()); Out.writebyte (cmd.getSerializer()); // Write the protocol serializer out.writeByte(cmd.getSerializer()); Out.writebyte (cmd.getProtocolSwitch().tobyte ()); out.writeByte(cmd.getProtocolSwitch().tobyte ()); // Determine whether the command is RequestCommand or ResponseCommand to write timeout or reply status valuesif (cmd instanceof RequestCommand) {
                    //timeout
                    out.writeInt(((RequestCommand) cmd).getTimeout());
                }
                if(cmd instanceof ResponseCommand) { //response status ResponseCommand response = (ResponseCommand) cmd; out.writeShort(response.getResponseStatus().getValue()); } // Write class length out.writeshort (cmd.getClazzLength()); // Write header length out.writeshort (cmd.getheaderLength ()); Out.writeint (cmd.getContentLength()); out.writeInt(cmd.getContentLength()); / / writing classif(cmd.getClazzLength() > 0) { out.writeBytes(cmd.getClazz()); } // write to the headerif(cmd.getHeaderLength() > 0) { out.writeBytes(cmd.getHeader()); } // Write the contentif(cmd.getContentLength() > 0) { out.writeBytes(cmd.getContent()); } // Check whether the protocol is V2 and CRC is enabled to perform cyclic redundancy checkif(ver == RpcProtocolV2.PROTOCOL_VERSION_2 && cmd.getProtocolSwitch().isOn(ProtocolSwitch.CRC_SWITCH_INDEX)) { // compute the crc32 and write to out byte[] frame = new byte[out.readableBytes()]; out.getBytes(index, frame); out.writeInt(CrcUtil.crc32(frame)); }}else{// Throw an exception String warnMsg ="msg type [" + msg.getClass() + "] is not subclass of RpcCommand";
                logger.warn(warnMsg);
            }
        } catch (Exception e) {
            logger.error("Exception caught!", e); throw e; }}}Copy the code

From the code, we can see that data reads and writes in Netty interact in the unit of ByteBuf. Let’s take a brief look at ByteBuf.

ByteBuf structure

The above is a structure diagram of ByteBuf, as can be seen from the picture above

  1. ByteBuf is a byte container, in which the data is divided into three parts. The first part is the discarded bytes, which are invalid. The second part is the readable byte, which is the main data of ByteBuf. All data read from ByteBuf comes from this part. The last part of the data is the writable section, and all data written to ByteBuf will be written to this section. The last dotted line shows how much capacity the ByteBuf can expand
  2. Capacity specifies the total capacity of ByteBuf’s underlying memory. Capacity specifies the total capacity of ByteBuf’s underlying memory. Capacity specifies the capacity of ByteBuf’s underlying memory
  3. For each byte read from ByteBuf, readerIndex increments by 1, ByteBuf is not readable when readerIndex equals writerIndex. ByteBuf is not readable when readerIndex equals writerIndex
  4. Data is written from the part pointed to by writerIndex. For each byte written, writerIndex increases by 1 until it reaches capacity. At this point, ByteBuf cannot be written
  5. The maxCapacity parameter is used to write data to ByteBuf. If the capacity of data written to ByteBuf is insufficient, the capacity can be expanded until the capacity exceeds maxCapacity. If the capacity exceeds maxCapacity, an error is reported

Netty uses The ByteBuf data structure to effectively distinguish readable data from writable data. There is no conflict between read and write data. Of course, ByteBuf is just an abstraction of binary data. All we need to know is that Netty only recognizes ByteBuf for reading and writing data.

Capacity of the API

capacity()

Represents the number of bytes of memory (including discarded bytes, readable bytes, and writable bytes) used by the underlying ByteBuf

maxCapacity()

Indicates the maximum number of bytes that can be used by the underlying layer of ByteBuf. If the capacity of data written to ByteBuf is insufficient, the system expands the capacity until it reaches maxCapacity. If the capacity exceeds the threshold, an exception is thrown

ReadableBytes () and isReadable ()

ReadableBytes () represents the number of bytes currently readable by ByteBuf. Its value is equal to writerIndex-readerIndex. If the two are equal, it is unreadable. IsReadable () returns false

WritableBytes(), isWritable(), and maxWritableBytes()

WritableBytes () indicates the number of bytes currently writable by ByteBuf. This value is equal to capacity-writerIndex, or is not writable if both are equal. IsWritable () returns false, but at this point, This does not mean that data cannot be written into ByteBuf. If data cannot be written into ByteBuf, Netty automatically expands ByteBuf until the underlying memory capacity is maxCapacity. MaxWritableBytes () represents the maximum number of bytes that can be written, which is equal to maxcapacity-writerIndex

Read and write API

Essentially, reading and writing about ByteBuf can be viewed as reading and writing data from where the pointer begins

WriteBytes (byte[] SRC) and buffer.readbytes (byte[] DST)

WriteBytes () means to write all the SRC data to ByteBuf, while readBytes() means to read all the ByteBuf data to DST, where the DST byte array is usually the same size as readableBytes(), The length of the SRC byte array size is usually less than or equal to writableBytes()

WriteByte (byte b) and buffer. ReadByte ()

WriteByte () means to write a byte into ByteBuf, while buffer.readbyte () means to read a byte from ByteBuf, Similar apis include writeBoolean(), writeChar(), writeShort(), writeInt(), writeLong(), writeFloat(), writeDouble() and ReadBoolean (), readChar(), readShort(), readInt(), readLong(), readFloat(), and readDouble() should be easy to understand

The getBytes () and setBytes() and setByte() families are similar to the read/write apis, with the only difference being that get/set does not change the read/write pointer, while read/write does.

Decoder

Next we see decoding implementation process, the source code path com. Alipay. The remoting.. RPC protocol. RpcCommandDecoderV2.

Check whether the length of the readable data is greater than the minimum length of the header of the request packet and the header of the reply packet. And validation of ByteBuf’s magic number, which throws an exception if it is not a recognized protocol.

The code is as follows:

private int lessLen; { lessLen = RpcProtocolV2.getResponseHeaderLength() < RpcProtocolV2.getRequestHeaderLength() ? RpcProtocolV2 .getResponseHeaderLength() : RpcProtocolV2.getRequestHeaderLength(); } // Minimum length of the headers of the request and reply packets // The less length between response header and request headerif(in.readableBytes() >= lessLen) {// Save the current read pointer to in.markReaderIndex(); Byte protocol = in.readbyte (); // Restore the read pointer to its original position, i.e. In.mark.. Location in the resetReaderIndex ();if (protocol == RpcProtocolV2.PROTOCOL_CODE) {
               ......
            } else{// Find magic number exception, throw unknown protocol error! String emsg ="Unknown protocol: "+ protocol; logger.error(emsg); throw new RuntimeException(emsg); }}Copy the code

Read and write pointer related APIS

With readerIndex readerIndex () (int)

The former returns the current read pointer readerIndex, and the latter sets the read pointer

With writeIndex writeIndex () (int)

The former returns the current write pointer writerIndex, and the latter sets the write pointer

With resetReaderIndex markReaderIndex () ()

The former means to save the current read pointer, the latter means to restore the current read pointer to the saved value, the following two codes are equivalent

// 1 int readerIndex = buffer.readerIndex(); / /.. Other operations buffer.readerIndex(readerIndex); Buffer.markreaderindex (); / /.. Other operations buffer.resetreaderIndex ();Copy the code

ResetReaderIndex () allows you to return to the same state when you call resetReaderIndex(), regardless of where the buffer is passed as a parameter. This is very common when parsing data packets using custom protocols

With resetWriterIndex markWriterIndex () ()

RPC request command decoding and reply command decoding are similar. I take request decoding as an example to interpret the following:

if (type == RpcCommandType.REQUEST || type== rpcCommandType. REQUEST_ONEWAY) {// Decode request reads three bytes, so minus 3 is requiredif (in.readableBytes() >= RpcProtocolV2.getRequestHeaderLength() - 3) {
                            short cmdCode = in.readShort();
                            byte ver2 = in.readByte();
                            int requestId = in.readInt();
                            byte serializer = in.readByte();
                            byte protocolSwitchValue = in.readByte();
                            int timeout = in.readInt();
                            short classLen = in.readShort();
                            short headerLen = in.readShort();
                            int contentLen = in.readInt();
                            byte[] clazz = null;
                            byte[] header = null;
                            byte[] content = null;

                            // decide the at-least bytes length foreach version int lengthAtLeastForV1 = classLen + headerLen + contentLen; // Check whether CRC is enabled. If yes, the minimum length of bytes plus 4 Boolean crcSwitchOn = protocolSwitch.ison (Protocolswitch.crc_Switch_index, protocolSwitchValue); int lengthAtLeastForV2 = classLen + headerLen + contentLen;if(crcSwitchOn) { lengthAtLeastForV2 += 4; // CRC int} // If the protocol is V1 and the length is greater than the minimum length of V1 or V2 and the length is greater than the minimum length of V2, continue reading //continue read
                            if ((version == RpcProtocolV2.PROTOCOL_VERSION_1 && in.readableBytes() >= lengthAtLeastForV1)
                                || (version == RpcProtocolV2.PROTOCOL_VERSION_2 && in.readableBytes() >= lengthAtLeastForV2)) {// Read the classif(classLen > 0) { clazz = new byte[classLen]; in.readBytes(clazz); } // Read the headerif(headerLen > 0) { header = new byte[headerLen]; in.readBytes(header); } // Read the contentif (contentLen > 0) {
                                    content = new byte[contentLen];
                                    in.readBytes(content);
                                }
                                if(version == rpcPROTOCOLv2.PROTOCOL_version_2&& crcSwitchOn) {// checkCRC(version == rPCPROTOCOLv2.PROTOCOL_version_2&& crcSwitchOn)in, startIndex); }}else{// Not enough data, reset the read pointer in.resetreaderIndex ();return;
                            }

                            RequestCommand command; // Determine whether it is a heartbeat command or a request commandif (cmdCode == CommandCode.HEARTBEAT_VALUE) {
                                command = new HeartbeatCommand();
                            } else {
                                command= createRequestCommand(cmdCode); } // Encapsulate entity command.setType(type);
                            command.setVersion(ver2);
                            command.setId(requestId);
                            command.setSerializer(serializer);
                            command.setProtocolSwitch(ProtocolSwitch.create(protocolSwitchValue));
                            command.setTimeout(timeout);
                            command.setClazz(clazz);
                            command.setHeader(header);
                            command.setContent(content);

                            out.add(command);
                        } else {
                            in.resetReaderIndex();
                        }
Copy the code

Heartbeat

Protocol-specific heartbeat triggering and processing: Different protocols may require different heartbeat processing logic. So the logic that triggers the heartbeat, the logic that handles the heartbeat, needs to be considered separately. Source code path for: com. Alipay. The remoting.. RPC protocol. RpcHeartbeatTrigger.

/** max trigger times*/ public static final Integer maxCount = configManager.tcp_IDLE_maxTimes (); private static final long heartbeatTimeoutMillis = 1000; @Override public void heartbeatTriggered(final ChannelHandlerContext CTX) throws Exception {// Obtain the connection heartbeat times Integer heartbeatTimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get(); final Connection conn = ctx.channel().attr(Connection.CONNECTION).get(); // If the heartbeat count is greater than 3, the connection is closedif(heartbeatTimes >= maxCount) { try { conn.close(); // Throw logger.error("Heartbeat failed for {} times, close the connection from client side: {} ",
                    heartbeatTimes, RemotingUtil.parseRemoteAddress(ctx.channel()));
            } catch (Exception e) {
                logger.warn("Exception caught when closing connection in SharableHandler.", e); }}else {
            boolean heartbeatSwitch = ctx.channel().attr(Connection.HEARTBEAT_SWITCH).get();
            if(! heartbeatSwitch) {return;
            }
            final HeartbeatCommand heartbeat = new HeartbeatCommand();

            final InvokeFuture future = new DefaultInvokeFuture(heartbeat.getId(),
                new InvokeCallbackListener() { @Override public void onResponse(InvokeFuture future) { ResponseCommand response; . // The number of triggers plus an Integertimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
                            ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(times + 1);
                        }
                    }

                    @Override
                    public String getRemoteAddress() {
                        return ctx.channel().remoteAddress().toString();
                    }
                }, null, heartbeat.getProtocolCode().getFirstByte(), this.commandFactory);
            final int heartbeatId = heartbeat.getId();
            conn.addInvokeFuture(future);
            if (logger.isDebugEnabled()) {
                logger.debug("Send heartbeat, successive count={}, Id={}, to remoteAddr={}", heartbeatTimes, heartbeatId, RemotingUtil.parseRemoteAddress(ctx.channel())); } // Async callback result ctx.writeAndFlush(heartbeat).addListener(newChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ...... }}); //TimerHolder implements timerholder.getTimer ().newTimeout(new) for the Netty toolclass timewheel algorithmTimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    InvokeFuture future = conn.removeInvokeFuture(heartbeatId);
                    if(future ! = null) { future.putResponse(commandFactory.createTimeoutResponse(conn .getRemoteAddress())); future.tryAsyncExecuteInvokeCallbackAbnormally(); } } }, heartbeatTimeoutMillis, TimeUnit.MILLISECONDS); }}Copy the code

For those interested in HashedWheelTimer, check out the following article.

  • Netty Tool Class HashedWheelTimer source Code Walk (1)
  • Netty Tool Class HashedWheelTimer source Code (2)
  • Netty Tool Class HashedWheelTimer source Code Walk (3)
  • Zacard “Netty Source Code Interpretation of the Time wheel algorithm implementation -HashedWheelTimer”

The Command with the Command Handler

  • Extensible command and command processor management

Figure 2 – Example of communication command design

  • Load commands: generally transmitted specific business data, such as the command with request parameters, response results;

  • Control commands: some function management commands, heartbeat commands, etc., they usually complete some complex distributed cross-node coordination functions, in order to ensure the stability of the load command communication process, is an essential part.

  • In the communication process of protocols, there are various Command definitions. Logically, we call the request object that transmits the specific service Payload Payload Command, and another kind of request object is called Control Command, such as some function management Command or heartbeat Command.

  • After defining the communication command, we also need to define the command processor, which is used to write the business processing logic corresponding to each command. Also, we need to save the mapping between the command and the command processor so that we can go to the correct processor during the processing phase.

    CommandFactory RpcCommandFactory with

    The main function of these two classes is the function of command factories, generating request commands with request entities, and generating some commands with request parameters and response results. The response status includes SUCCESS,ERROR,SERVER_EXCEPTION,TIMEOUT, and so on.

    RpcCommandHandler and CommandHandler

    /** * Command handler. * @author jiangping * @version$Id: CommandHandler.java, V 0.1 2015-12-14 PM4:03:55 tao Exp $*/ public interface CommandHandler {/** * Handle the command * @param msg * @throws Exception */ void handleCommand(RemotingContext ctx, Object msg) throws Exception; /** * Register processorfor command* @param CMD * @param processor/void registerProcessor(CommandCode CMD, RemotingProcessor<? > processor); /** * Register default executorforThe handler. * Registers the default executor of the processing class * @param executor */ void registerDefaultExecutor(ExecutorService Executor); /** * Get default executorforThe handler. * Gets the default executor of the processing class */ ExecutorService getDefaultExecutor(); }Copy the code

    This can be done by creating an ExecutorService thread pool and submitting the processing of commands to the thread pool. If no thread pool is set for this processing class, Bolt creates a thread pool by default with the following parameters:

    • CorePoolSize (base size of the thread pool) : 20

    • MaximumPoolSize (maximum thread pool size) :400

    • KeepAliveTime: 60s

    • RunnableTaskQueue: ArrayBlockingQueue, queue size 6000

    • ThreadFactory: A naming factory that creates a “bolt-default-executor” prefix.

    For those of you who don’t know much about thread pools, check out the following article.

    Understand the thread pool to walk into dubbo source code

  • other

  • This concludes the detailed design of SOFABolt’s private communication protocol. Of course, ALL of the above comments, I have uploaded my Bolt comment library on my Github.

  • Link: github.com/sanshengshu…

  • Original is not easy, if you feel good, I hope to give a recommendation! Your support is the biggest motivation for my writing!

  • Copyright Notice:

  • Author: Mu Shuwei

  • github.com/sanshengshu…

  • Sanshengshui.github. IO /