This is the first day of my participation in the August Challenge. For details, see:August is more challenging

The premise

Recently, I was looking at Netty related materials, and it happened that SOFA BOLT is a relatively mature implementation of Netty custom protocol stack, so I decided to read the source code of SOFA BOLT, analyze its protocol composition in detail, and simply analyze its source code implementation of client and server.

  • For fun:SOFA-BOLTCode indent andFastJsonSimilarly, variable names are forced to align, making the source code uncomfortable for the average developer

The source code currently read is the source code for the master branch of the SOFA BOLT repository around 2021-08.

A brief introduction to SOFA BOLT

SOFA-BOLT is a set of network communication framework based on Netty implementation developed by Ant Financial Services Group. In essence, it is a set of Netty proprietary protocol stack encapsulation. The purpose is to enable developers to pay more attention to the implementation of business logic based on network communication. Instead of getting too tangled up in the implementation of NIO at the bottom of the network and dealing with hard-to-debug network issues and Netty secondary development issues. The architectural design and functions of SOFA-BOLT are as follows:

Above is derived from the SOFA – BOLT’s official website www.sofastack.tech/projects/so…

SOFA-BOLT Protocol Perspective

Since the SOFA-BOLT protocol is a custom protocol stack based on the Netty implementation, the implementation of the protocol itself can be quickly found in the implementation of Encoder and Decoder, and further located in the com.alipay. Remoting. RPC package. According to the source code, there are two versions of THE SOFA-BOLT protocol. The protocol is described in detail in the comments at the top of the class of RpcProtocol and RpcProtocolV2. Based on these introductions, the basic components of the two versions of the protocol can be summarized.

This section describes basic components of the V1 protocol

  • V1Version of the protocol requestFrameBasic composition:

  • V1Version of the protocol responseFrameBasic composition:

For the V1 protocol, the properties are expanded as follows:

  • requestFrameAnd the responseFramePublic properties of:
Attribute Code Attribute meaning Java type Size (byte) note
proto Protocol coding byte 1 V1Version,proto = 1.V2Version,proto = 2
type type byte 1 0 => RESPONSE.1 => REQUEST.2 => REQUEST_ONEWAY
cmdcode The command code short 2 1 => rpc request.2 => rpc response
ver2 Command version byte 1 From the source code it is currently fixed to1
requestId Request ID int 4 A requestCMDThe globally unique identifier of
codec Encoding decoder byte 1

In the table above, codec is literally a codec, but is actually a token for the serialization and dissequence implementations. V1 and V2 are currently fixed codec = 1, tracked through the source code to the SerializerManager configuration value of Hessian2 = 1. By default, Hessian2 is used for serialization and deserialization. For details, see HessianSerializer in the source code

  • requestFrameUnique properties:
Attribute Code Attribute meaning Java type Size (byte) note
timeout Request timeout int 4
classLen The length of the name of the request object (parameter) type short 2 value> = 0
headerLen Header length short 2 value> = 0
contentLen Request length int 4 value> = 0
className bytes The name of the request object (parameter) type byte[]
header bytes Request header byte[]
content bytes Request content byte[]
  • The responseFrameUnique properties:
Attribute Code Attribute meaning Java type Size (byte) note
respstatus Response status value short 2 inResponseStatusIs currently built in13States, for example0 => SUCCESS
classLen The length of the name of the response object (parameter) type short 2 value> = 0
headerLen Length of response head short 2 value> = 0
contentLen Length of response content int 4 value> = 0
className bytes The name of the response object (parameter) type byte[]
header bytes Response headers byte[]
content bytes Response content byte[]

The request Frame has a timeout property and the response Frame has a respstatus property. Most of the attributes are multiplexed, and the three lengths and three byte arrays are mutually restricted:

  • classLen <=> className bytes
  • headerLen <=> header bytes
  • contentLen <=> content bytes

This section describes basic components of the V2 protocol

  • V2Version of the protocol requestFrameBasic composition:

  • V2Version of the protocol responseFrameBasic composition:

Compared with protocol V1, protocol V2 has two mandatory public attributes and one optional public attribute.

Attribute Code Attribute meaning Java type Size (byte) note
ver1 Protocol version byte 1 In order to inV2Compatible with the version protocolV1Version protocol
switch The protocol switch byte 1 Based on theBitSetRealizes the switch, at most8a
CRC32 Cyclic redundancy check value int 4 Optional by switchProtocolSwitch.CRC_SWITCH_INDEXWhen you decide whether to enable it or not, when you enable it, it will be based on the wholeFrameTo calculate

Among the new properties, switch represents the byte field that is translated from the BitSet in the ProtocolSwitch implementation. Since byte is only 8 bits, the protocol can transmit the status of a maximum of 8 switches during transmission, with the subscript [0,7]. CRC32 is calculated based on the byte array converted from the entire Frame. There is a native API in the JDK. You can simply build a utility class to calculate the following:

public enum Crc32Utils {

    /** * singleton */
    X;

    /** * Calculate the CRC32 result **@paramThe content content *@return crc32 result
     */
    public long crc32(byte[] content) {
        CRC32 crc32 = new CRC32();
        crc32.update(content, 0, content.length);
        long r = crc32.getValue();
        // crc32.reset();
        returnr; }}Copy the code

The V2 version of the protocol casts the result of CRC32 to an int, so you can think about why there is no overflow.

SOFA – BOLT structure

The source code of SOFA BOLT is not complicated, considering that the analysis of the source code will be lengthy, and if you have experience in developing the Netty custom protocol stack, only the architecture and core component functions of SOFA BOLT are analyzed here. A Protocol is defined by the interface Protocol:

public interface Protocol {
    
    // Command encoder
    CommandEncoder getEncoder(a);

    // Command decoder
    CommandDecoder getDecoder(a);

    // The heartbeat trigger
    HeartbeatTrigger getHeartbeatTrigger(a);

    // Command processor
    CommandHandler getCommandHandler(a);

    // Order the factory
    CommandFactory getCommandFactory(a);
}
Copy the code

By implementing RpcProtocolV2, we can know:

In addition, all frames that need to be sent or received are wrapped as commands, and the Command family is as follows:

That is:

  • RequestCommandDefines all the attributes required by the request command, which are ultimately determined byRpcCommandEncoderV2coding
  • ResponseCommandDefines all the attributes required to respond to the command, and is ultimately determined byRpcCommandDecoderV2decode

After sorting through the above components, you can draw the following diagram of Client => Server interaction based on the SOFA-BOLT protocol:

SOFA – BOLT used

Since THE complete RpcClient and RpcServer have been encapsulated in SOFA – Bolt, using this protocol only needs to reference the dependencies, then initialize the client and server, and write the corresponding UserProcessor implementation. Introducing dependent dependencies:

<dependency>
    <groupId>com.alipay.sofa</groupId>
    <artifactId>bolt</artifactId>
    <version>1.6.3</version>
</dependency>
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>4.0.65</version>
</dependency>
Copy the code

Create a new request entity class (RequestMessage), a new response entity class (ResponseMessage), and a corresponding processor (RequestMessageProcessor) :

@Data
public class RequestMessage implements Serializable {

    private Long id;

    private String content;
}

@Data
public class ResponseMessage implements Serializable {

    private Long id;

    private String content;

    private Long status;
}

public class RequestMessageProcessor extends SyncUserProcessor<RequestMessage> {

    @Override
    public Object handleRequest(BizContext bizContext, RequestMessage requestMessage) throws Exception {
        ResponseMessage message = new ResponseMessage();
        message.setContent(requestMessage.getContent());
        message.setId(requestMessage.getId());
        message.setStatus(10087L);
        return message;
    }

    @Override
    public String interest(a) {
        returnRequestMessage.class.getName(); }}Copy the code

The processor needs to inherit from superclass SyncUserProcessor for synchronous processing, and superclass AsyncUserProcessor for asynchronous processing. All entity classes as parameters must implement Serializable interface (if there are nested objects, The class of each nested object must also implement the Serializable interface, or a Serializable exception will occur. Finally, write the client and server code:

@Slf4j
public class BlotApp {

    private static final int PORT = 8081;
    private static final String ADDRESS = "127.0.0.1:" + PORT;

    public static void main(String[] args) throws Exception {
        RequestMessageProcessor processor = new RequestMessageProcessor();
        RpcServer server = new RpcServer(8081.true);
        server.startup();
        server.registerUserProcessor(processor);
        RpcClient client = new RpcClient();
        client.startup();
        RequestMessage request = new RequestMessage();
        request.setId(99L);
        request.setContent("hello bolt");
        ResponseMessage response = (ResponseMessage) client.invokeSync(ADDRESS, request, 2000);
        log.info("Response result :{}", response); }}Copy the code

Run the output:

ResponseMessage(id=99, content=hello bolt, status=10087)Copy the code

Write simple CURD project based on SOFA-BOLT protocol

Local test MySQL service build customer table as follows:

CREATE DATABASE test;

USE test;

CREATE TABLE t_customer
(
    id            BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
    customer_name VARCHAR(32)     NOT NULL
);
Copy the code

To simplify JDBC operations, introduce the spring-boot-starter- JDBC (here I borrow only the light encapsulation of JdbcTemplate) dependency:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.20</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>2.3.0. RELEASE</version>
</dependency>
Copy the code

Writing a core synchronization processor:

/ / create
@Data
public class CreateCustomerReq implements Serializable {

    private String customerName;
}

@Data
public class CreateCustomerResp implements Serializable {

    private Long code;

    private Long customerId;
}

public class CreateCustomerProcessor extends SyncUserProcessor<CreateCustomerReq> {

    private final JdbcTemplate jdbcTemplate;

    public CreateCustomerProcessor(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Object handleRequest(BizContext bizContext, CreateCustomerReq req) throws Exception {
        KeyHolder keyHolder = new GeneratedKeyHolder();
        jdbcTemplate.update(connection -> {
            PreparedStatement ps = connection.prepareStatement("insert into t_customer(customer_name) VALUES (?) ",
                    Statement.RETURN_GENERATED_KEYS);
            ps.setString(1, req.getCustomerName());
            return ps;
        }, keyHolder);
        CreateCustomerResp resp = new CreateCustomerResp();
        resp.setCustomerId(Objects.requireNonNull(keyHolder.getKey()).longValue());
        resp.setCode(RespCode.SUCCESS);
        return resp;
    }

    @Override
    public String interest(a) {
        returnCreateCustomerReq.class.getName(); }}/ / update
@Data
public class UpdateCustomerReq implements Serializable {

    private Long customerId;

    private String customerName;
}


public class UpdateCustomerProcessor extends SyncUserProcessor<UpdateCustomerReq> {

    private final JdbcTemplate jdbcTemplate;

    public UpdateCustomerProcessor(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Object handleRequest(BizContext bizContext, UpdateCustomerReq req) throws Exception {
        UpdateCustomerResp resp = new UpdateCustomerResp();
        int updateCount = jdbcTemplate.update("UPDATE t_customer SET customer_name = ? WHERE id = ?", ps -> {
            ps.setString(1, req.getCustomerName());
            ps.setLong(2, req.getCustomerId());
        });
        if (updateCount > 0) {
            resp.setCode(RespCode.SUCCESS);
        }
        return resp;
    }

    @Override
    public String interest(a) {
        returnUpdateCustomerReq.class.getName(); }}/ / delete
@Data
public class DeleteCustomerReq implements Serializable {

    private Long customerId;
}

@Data
public class DeleteCustomerResp implements Serializable {

    private Long code;
}

public class DeleteCustomerProcessor extends SyncUserProcessor<DeleteCustomerReq> {

    private final JdbcTemplate jdbcTemplate;

    public DeleteCustomerProcessor(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Object handleRequest(BizContext bizContext, DeleteCustomerReq req) throws Exception {
        DeleteCustomerResp resp = new DeleteCustomerResp();
        int updateCount = jdbcTemplate.update("DELETE FROM t_customer WHERE id = ?", ps -> ps.setLong(1,req.getCustomerId()));
        if (updateCount > 0){
            resp.setCode(RespCode.SUCCESS);
        }
        return resp;
    }

    @Override
    public String interest(a) {
        returnDeleteCustomerReq.class.getName(); }}/ / query
@Data
public class SelectCustomerReq implements Serializable {

    private Long customerId;
}

@Data
public class SelectCustomerResp implements Serializable {

    private Long code;

    private Long customerId;

    private String customerName;
}

public class SelectCustomerProcessor extends SyncUserProcessor<SelectCustomerReq> {

    private final JdbcTemplate jdbcTemplate;

    public SelectCustomerProcessor(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public Object handleRequest(BizContext bizContext, SelectCustomerReq req) throws Exception {
        SelectCustomerResp resp = new SelectCustomerResp();
        Customer result = jdbcTemplate.query("SELECT * FROM t_customer WHERE id = ?", ps -> ps.setLong(1, req.getCustomerId()), rs -> {
            Customer customer = null;
            if (rs.next()) {
                customer = new Customer();
                customer.setId(rs.getLong("id"));
                customer.setCustomerName(rs.getString("customer_name"));
            }
            return customer;
        });
        if (Objects.nonNull(result)) {
            resp.setCustomerId(result.getId());
            resp.setCustomerName(result.getCustomerName());
            resp.setCode(RespCode.SUCCESS);
        }
        return resp;
    }

    @Override
    public String interest(a) {
        return SelectCustomerReq.class.getName();
    }

    @Data
    public static class Customer {

        private Long id;
        privateString customerName; }}Copy the code

Write data source, client, and server code:

public class CurdApp {

    private static final int PORT = 8081;
    private static final String ADDRESS = "127.0.0.1:" + PORT;

    public static void main(String[] args) throws Exception {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/test? useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai");
        config.setDriverClassName(Driver.class.getName());
        config.setUsername("root");
        config.setPassword("root");
        HikariDataSource dataSource = new HikariDataSource(config);
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        CreateCustomerProcessor createCustomerProcessor = new CreateCustomerProcessor(jdbcTemplate);
        UpdateCustomerProcessor updateCustomerProcessor = new UpdateCustomerProcessor(jdbcTemplate);
        DeleteCustomerProcessor deleteCustomerProcessor = new DeleteCustomerProcessor(jdbcTemplate);
        SelectCustomerProcessor selectCustomerProcessor = new SelectCustomerProcessor(jdbcTemplate);
        RpcServer server = new RpcServer(PORT, true);
        server.registerUserProcessor(createCustomerProcessor);
        server.registerUserProcessor(updateCustomerProcessor);
        server.registerUserProcessor(deleteCustomerProcessor);
        server.registerUserProcessor(selectCustomerProcessor);
        server.startup();
        RpcClient client = new RpcClient();
        client.startup();
        CreateCustomerReq createCustomerReq = new CreateCustomerReq();
        createCustomerReq.setCustomerName("throwable.club");
        CreateCustomerResp createCustomerResp = (CreateCustomerResp)
                client.invokeSync(ADDRESS, createCustomerReq, 5000);
        System.out.println(Create user [throwable. Club] result: + createCustomerResp);
        SelectCustomerReq selectCustomerReq = new SelectCustomerReq();
        selectCustomerReq.setCustomerId(createCustomerResp.getCustomerId());
        SelectCustomerResp selectCustomerResp = (SelectCustomerResp)
                client.invokeSync(ADDRESS, selectCustomerReq, 5000);
        System.out.println(String.format(Query user [id=%d] result :%s", selectCustomerReq.getCustomerId(),
                selectCustomerResp));
        UpdateCustomerReq updateCustomerReq = new UpdateCustomerReq();
        updateCustomerReq.setCustomerId(selectCustomerReq.getCustomerId());
        updateCustomerReq.setCustomerName("throwx.cn");
        UpdateCustomerResp updateCustomerResp = (UpdateCustomerResp)
                client.invokeSync(ADDRESS, updateCustomerReq, 5000);
        System.out.println(String.format(Update user [id=%d] result :%s, updateCustomerReq.getCustomerId(),
                updateCustomerResp));
        selectCustomerReq.setCustomerId(updateCustomerReq.getCustomerId());
        selectCustomerResp = (SelectCustomerResp)
                client.invokeSync(ADDRESS, selectCustomerReq, 5000);
        System.out.println(String.format(Select * from user [id=%d] where id=% s, selectCustomerReq.getCustomerId(),
                selectCustomerResp));
        DeleteCustomerReq deleteCustomerReq = new DeleteCustomerReq();
        deleteCustomerReq.setCustomerId(selectCustomerResp.getCustomerId());
        DeleteCustomerResp deleteCustomerResp = (DeleteCustomerResp)
                client.invokeSync(ADDRESS, deleteCustomerReq, 5000);
        System.out.println(String.format(Delete user [id=%d] result :%s", deleteCustomerReq.getCustomerId(), deleteCustomerResp)); }}Copy the code

The execution result is as follows:

SelectCustomerResp(code=0, customerId=1) SelectCustomerResp(code=0, customerId=1) SelectCustomerResp(code=0) SelectCustomerResp(code=0) SelectCustomerResp(code=0) CustomerId =1, customerName=throwx.cn) update user [id=1]Copy the code

Verify that the database table is empty after the final deletion.

Write SOFA-BOLT protocol client based on GO language

This is an attempt to use GO language to write a SOFA-BOLT protocol client. Considering that the implementation of a complete version will be more complex, this is simplified to only implement Encode and command invocation parts, without handling the response and Decode temporarily. Write the RequestCommand structure as follows:

// RequestCommand sofa-bolt v2 req cmd
type RequestCommand struct {
	ProtocolCode    uint8
	ProtocolVersion uint8
	Type            uint8
	CommandCode     uint16
	CommandVersion  uint8
	RequestId       uint32
	Codec           uint8
	Switch          uint8
	Timeout         uint32
	ClassLength     uint16
	HeaderLength    uint16
	ContentLength   uint32
	ClassName       []byte
	Header          []byte
	Content         []byte
}
Copy the code

Note that all integer types must use a specific type. For example, uint must use uint32. Otherwise, there will be an exception to write Buffer. Then write a code method:

// encode req => slice
func encode(cmd *RequestCommand) []byte {
	container := make([]byte.0)
	buf := bytes.NewBuffer(container)
	buf.WriteByte(cmd.ProtocolCode)
	buf.WriteByte(cmd.ProtocolVersion)
	buf.WriteByte(cmd.Type)
	binary.Write(buf, binary.BigEndian, cmd.CommandCode)
	buf.WriteByte(cmd.CommandVersion)
	binary.Write(buf, binary.BigEndian, cmd.RequestId)
	buf.WriteByte(cmd.Codec)
	buf.WriteByte(cmd.Switch)
	binary.Write(buf, binary.BigEndian, cmd.Timeout)
	binary.Write(buf, binary.BigEndian, cmd.ClassLength)
	binary.Write(buf, binary.BigEndian, cmd.HeaderLength)
	binary.Write(buf, binary.BigEndian, cmd.ContentLength)
	buf.Write(cmd.ClassName)
	buf.Write(cmd.Header)
	buf.Write(cmd.Content)
	return buf.Bytes()
}
Copy the code

Finally, write the TCP client:

type Req struct {
	Id   int64  `json:"id"`
	Name string `json:"name"`
}

package main

import (
	"bytes"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"net"
)

func main(a) {
	con, err := net.Dial("tcp"."127.0.0.1:9999")
	iferr ! =nil {
		fmt.Println("err:", err)
		return
	}
	defer con.Close()
	req := &Req{
		Id:   8080,
		Name: "throwx.cn",
	}
	content, err := json.Marshal(req)
	iferr ! =nil {
		fmt.Println("err:", err)
		return
	}
	var header []byte
	className := []byte("com.alipay.remoting.Req")
	cmd := &RequestCommand{
		ProtocolCode:    2,
		ProtocolVersion: 2,
		Type:            1,
		CommandCode:     1,
		CommandVersion:  1,
		RequestId:       10087,
		Codec:           1,
		Switch:          0,
		Timeout:         5000,
		ClassLength:     uint16(len(className)),
		HeaderLength:    0,
		ContentLength:   uint32(len(content)),
		ClassName:       className,
		Header:          header,
		Content:         content,
	}
	pkg := encode(cmd)
	_, err = con.Write(pkg)
	iferr ! =nil {
		fmt.Println("err:", err)
		return}}Copy the code

The Crc32 attribute for the V2 version of the protocol is optional and is ignored here for the sake of processing simplicity

In order to simplify processing, JSON is used for serialization, so we need to change the source code of SOFA-BOLT slightly, introduce FastJson and FastJsonSerializer, as shown in the following figure:

First start BoltApp (SOFA-BOLT server), then execute the GO written client, the result is as follows:

summary

SOFA-BOLT is a high performance mature and extensible Netty proprietary protocol encapsulation, compared to native Netty programming, provides convenient synchronous, asynchronous calls, basic heartbeat support and reconnection features. SyncUserProcessor and AsyncUserProcessor are introduced to facilitate service development. The SOFA-BOLT protocol is also a compact and high performance RPC protocol. When considering the introduction of Netty for low-level communication, THE USE of SOFA-BOLT can be given priority or considered as one of the candidate solutions, because SOFA-BOLT is lightweight, has a gentle learning curve, and basically has no other middleware dependencies.

Demo warehouse:

  • framework-mesh/sofa-bolt-demo

(C-5-D E-A-20210806)