This is the first day of my participation in the August Challenge. For details, see:August is more challenging
The premise
Recently, I was looking at Netty related materials, and it happened that SOFA BOLT is a relatively mature implementation of Netty custom protocol stack, so I decided to read the source code of SOFA BOLT, analyze its protocol composition in detail, and simply analyze its source code implementation of client and server.
- For fun:
SOFA-BOLT
Code indent andFastJson
Similarly, variable names are forced to align, making the source code uncomfortable for the average developer
The source code currently read is the source code for the master branch of the SOFA BOLT repository around 2021-08.
A brief introduction to SOFA BOLT
SOFA-BOLT is a set of network communication framework based on Netty implementation developed by Ant Financial Services Group. In essence, it is a set of Netty proprietary protocol stack encapsulation. The purpose is to enable developers to pay more attention to the implementation of business logic based on network communication. Instead of getting too tangled up in the implementation of NIO at the bottom of the network and dealing with hard-to-debug network issues and Netty secondary development issues. The architectural design and functions of SOFA-BOLT are as follows:
Above is derived from the SOFA – BOLT’s official website www.sofastack.tech/projects/so…
SOFA-BOLT Protocol Perspective
Since the SOFA-BOLT protocol is a custom protocol stack based on the Netty implementation, the implementation of the protocol itself can be quickly found in the implementation of Encoder and Decoder, and further located in the com.alipay. Remoting. RPC package. According to the source code, there are two versions of THE SOFA-BOLT protocol. The protocol is described in detail in the comments at the top of the class of RpcProtocol and RpcProtocolV2. Based on these introductions, the basic components of the two versions of the protocol can be summarized.
This section describes basic components of the V1 protocol
V1
Version of the protocol requestFrame
Basic composition:
V1
Version of the protocol responseFrame
Basic composition:
For the V1 protocol, the properties are expanded as follows:
- request
Frame
And the responseFrame
Public properties of:
Attribute Code | Attribute meaning | Java type | Size (byte) | note |
---|---|---|---|---|
proto | Protocol coding | byte | 1 | V1 Version,proto = 1 .V2 Version,proto = 2 |
type | type | byte | 1 | 0 => RESPONSE .1 => REQUEST .2 => REQUEST_ONEWAY |
cmdcode | The command code | short | 2 | 1 => rpc request .2 => rpc response |
ver2 | Command version | byte | 1 | From the source code it is currently fixed to1 |
requestId | Request ID | int | 4 | A requestCMD The globally unique identifier of |
codec | Encoding decoder | byte | 1 | – |
In the table above, codec is literally a codec, but is actually a token for the serialization and dissequence implementations. V1 and V2 are currently fixed codec = 1, tracked through the source code to the SerializerManager configuration value of Hessian2 = 1. By default, Hessian2 is used for serialization and deserialization. For details, see HessianSerializer in the source code
- request
Frame
Unique properties:
Attribute Code | Attribute meaning | Java type | Size (byte) | note |
---|---|---|---|---|
timeout | Request timeout | int | 4 | |
classLen | The length of the name of the request object (parameter) type | short | 2 | value> = 0 |
headerLen | Header length | short | 2 | value> = 0 |
contentLen | Request length | int | 4 | value> = 0 |
className bytes | The name of the request object (parameter) type | byte[] |
– | |
header bytes | Request header | byte[] |
– | |
content bytes | Request content | byte[] |
– |
- The response
Frame
Unique properties:
Attribute Code | Attribute meaning | Java type | Size (byte) | note |
---|---|---|---|---|
respstatus | Response status value | short | 2 | inResponseStatus Is currently built in13 States, for example0 => SUCCESS |
classLen | The length of the name of the response object (parameter) type | short | 2 | value> = 0 |
headerLen | Length of response head | short | 2 | value> = 0 |
contentLen | Length of response content | int | 4 | value> = 0 |
className bytes | The name of the response object (parameter) type | byte[] |
– | |
header bytes | Response headers | byte[] |
– | |
content bytes | Response content | byte[] |
– |
The request Frame has a timeout property and the response Frame has a respstatus property. Most of the attributes are multiplexed, and the three lengths and three byte arrays are mutually restricted:
classLen <=> className bytes
headerLen <=> header bytes
contentLen <=> content bytes
This section describes basic components of the V2 protocol
V2
Version of the protocol requestFrame
Basic composition:
V2
Version of the protocol responseFrame
Basic composition:
Compared with protocol V1, protocol V2 has two mandatory public attributes and one optional public attribute.
Attribute Code | Attribute meaning | Java type | Size (byte) | note |
---|---|---|---|---|
ver1 | Protocol version | byte | 1 | In order to inV2 Compatible with the version protocolV1 Version protocol |
switch | The protocol switch | byte | 1 | Based on theBitSet Realizes the switch, at most8 a |
CRC32 | Cyclic redundancy check value | int | 4 | Optional by switchProtocolSwitch.CRC_SWITCH_INDEX When you decide whether to enable it or not, when you enable it, it will be based on the wholeFrame To calculate |
Among the new properties, switch represents the byte field that is translated from the BitSet in the ProtocolSwitch implementation. Since byte is only 8 bits, the protocol can transmit the status of a maximum of 8 switches during transmission, with the subscript [0,7]. CRC32 is calculated based on the byte array converted from the entire Frame. There is a native API in the JDK. You can simply build a utility class to calculate the following:
public enum Crc32Utils {
/** * singleton */
X;
/** * Calculate the CRC32 result **@paramThe content content *@return crc32 result
*/
public long crc32(byte[] content) {
CRC32 crc32 = new CRC32();
crc32.update(content, 0, content.length);
long r = crc32.getValue();
// crc32.reset();
returnr; }}Copy the code
The V2 version of the protocol casts the result of CRC32 to an int, so you can think about why there is no overflow.
SOFA – BOLT structure
The source code of SOFA BOLT is not complicated, considering that the analysis of the source code will be lengthy, and if you have experience in developing the Netty custom protocol stack, only the architecture and core component functions of SOFA BOLT are analyzed here. A Protocol is defined by the interface Protocol:
public interface Protocol {
// Command encoder
CommandEncoder getEncoder(a);
// Command decoder
CommandDecoder getDecoder(a);
// The heartbeat trigger
HeartbeatTrigger getHeartbeatTrigger(a);
// Command processor
CommandHandler getCommandHandler(a);
// Order the factory
CommandFactory getCommandFactory(a);
}
Copy the code
By implementing RpcProtocolV2, we can know:
In addition, all frames that need to be sent or received are wrapped as commands, and the Command family is as follows:
That is:
RequestCommand
Defines all the attributes required by the request command, which are ultimately determined byRpcCommandEncoderV2
codingResponseCommand
Defines all the attributes required to respond to the command, and is ultimately determined byRpcCommandDecoderV2
decode
After sorting through the above components, you can draw the following diagram of Client => Server interaction based on the SOFA-BOLT protocol:
SOFA – BOLT used
Since THE complete RpcClient and RpcServer have been encapsulated in SOFA – Bolt, using this protocol only needs to reference the dependencies, then initialize the client and server, and write the corresponding UserProcessor implementation. Introducing dependent dependencies:
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.65</version>
</dependency>
Copy the code
Create a new request entity class (RequestMessage), a new response entity class (ResponseMessage), and a corresponding processor (RequestMessageProcessor) :
@Data
public class RequestMessage implements Serializable {
private Long id;
private String content;
}
@Data
public class ResponseMessage implements Serializable {
private Long id;
private String content;
private Long status;
}
public class RequestMessageProcessor extends SyncUserProcessor<RequestMessage> {
@Override
public Object handleRequest(BizContext bizContext, RequestMessage requestMessage) throws Exception {
ResponseMessage message = new ResponseMessage();
message.setContent(requestMessage.getContent());
message.setId(requestMessage.getId());
message.setStatus(10087L);
return message;
}
@Override
public String interest(a) {
returnRequestMessage.class.getName(); }}Copy the code
The processor needs to inherit from superclass SyncUserProcessor for synchronous processing, and superclass AsyncUserProcessor for asynchronous processing. All entity classes as parameters must implement Serializable interface (if there are nested objects, The class of each nested object must also implement the Serializable interface, or a Serializable exception will occur. Finally, write the client and server code:
@Slf4j
public class BlotApp {
private static final int PORT = 8081;
private static final String ADDRESS = "127.0.0.1:" + PORT;
public static void main(String[] args) throws Exception {
RequestMessageProcessor processor = new RequestMessageProcessor();
RpcServer server = new RpcServer(8081.true);
server.startup();
server.registerUserProcessor(processor);
RpcClient client = new RpcClient();
client.startup();
RequestMessage request = new RequestMessage();
request.setId(99L);
request.setContent("hello bolt");
ResponseMessage response = (ResponseMessage) client.invokeSync(ADDRESS, request, 2000);
log.info("Response result :{}", response); }}Copy the code
Run the output:
ResponseMessage(id=99, content=hello bolt, status=10087)Copy the code
Write simple CURD project based on SOFA-BOLT protocol
Local test MySQL service build customer table as follows:
CREATE DATABASE test;
USE test;
CREATE TABLE t_customer
(
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
customer_name VARCHAR(32) NOT NULL
);
Copy the code
To simplify JDBC operations, introduce the spring-boot-starter- JDBC (here I borrow only the light encapsulation of JdbcTemplate) dependency:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.3.0. RELEASE</version>
</dependency>
Copy the code
Writing a core synchronization processor:
/ / create
@Data
public class CreateCustomerReq implements Serializable {
private String customerName;
}
@Data
public class CreateCustomerResp implements Serializable {
private Long code;
private Long customerId;
}
public class CreateCustomerProcessor extends SyncUserProcessor<CreateCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public CreateCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, CreateCustomerReq req) throws Exception {
KeyHolder keyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(connection -> {
PreparedStatement ps = connection.prepareStatement("insert into t_customer(customer_name) VALUES (?) ",
Statement.RETURN_GENERATED_KEYS);
ps.setString(1, req.getCustomerName());
return ps;
}, keyHolder);
CreateCustomerResp resp = new CreateCustomerResp();
resp.setCustomerId(Objects.requireNonNull(keyHolder.getKey()).longValue());
resp.setCode(RespCode.SUCCESS);
return resp;
}
@Override
public String interest(a) {
returnCreateCustomerReq.class.getName(); }}/ / update
@Data
public class UpdateCustomerReq implements Serializable {
private Long customerId;
private String customerName;
}
public class UpdateCustomerProcessor extends SyncUserProcessor<UpdateCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public UpdateCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, UpdateCustomerReq req) throws Exception {
UpdateCustomerResp resp = new UpdateCustomerResp();
int updateCount = jdbcTemplate.update("UPDATE t_customer SET customer_name = ? WHERE id = ?", ps -> {
ps.setString(1, req.getCustomerName());
ps.setLong(2, req.getCustomerId());
});
if (updateCount > 0) {
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest(a) {
returnUpdateCustomerReq.class.getName(); }}/ / delete
@Data
public class DeleteCustomerReq implements Serializable {
private Long customerId;
}
@Data
public class DeleteCustomerResp implements Serializable {
private Long code;
}
public class DeleteCustomerProcessor extends SyncUserProcessor<DeleteCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public DeleteCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, DeleteCustomerReq req) throws Exception {
DeleteCustomerResp resp = new DeleteCustomerResp();
int updateCount = jdbcTemplate.update("DELETE FROM t_customer WHERE id = ?", ps -> ps.setLong(1,req.getCustomerId()));
if (updateCount > 0){
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest(a) {
returnDeleteCustomerReq.class.getName(); }}/ / query
@Data
public class SelectCustomerReq implements Serializable {
private Long customerId;
}
@Data
public class SelectCustomerResp implements Serializable {
private Long code;
private Long customerId;
private String customerName;
}
public class SelectCustomerProcessor extends SyncUserProcessor<SelectCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public SelectCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, SelectCustomerReq req) throws Exception {
SelectCustomerResp resp = new SelectCustomerResp();
Customer result = jdbcTemplate.query("SELECT * FROM t_customer WHERE id = ?", ps -> ps.setLong(1, req.getCustomerId()), rs -> {
Customer customer = null;
if (rs.next()) {
customer = new Customer();
customer.setId(rs.getLong("id"));
customer.setCustomerName(rs.getString("customer_name"));
}
return customer;
});
if (Objects.nonNull(result)) {
resp.setCustomerId(result.getId());
resp.setCustomerName(result.getCustomerName());
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest(a) {
return SelectCustomerReq.class.getName();
}
@Data
public static class Customer {
private Long id;
privateString customerName; }}Copy the code
Write data source, client, and server code:
public class CurdApp {
private static final int PORT = 8081;
private static final String ADDRESS = "127.0.0.1:" + PORT;
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/test? useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai");
config.setDriverClassName(Driver.class.getName());
config.setUsername("root");
config.setPassword("root");
HikariDataSource dataSource = new HikariDataSource(config);
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
CreateCustomerProcessor createCustomerProcessor = new CreateCustomerProcessor(jdbcTemplate);
UpdateCustomerProcessor updateCustomerProcessor = new UpdateCustomerProcessor(jdbcTemplate);
DeleteCustomerProcessor deleteCustomerProcessor = new DeleteCustomerProcessor(jdbcTemplate);
SelectCustomerProcessor selectCustomerProcessor = new SelectCustomerProcessor(jdbcTemplate);
RpcServer server = new RpcServer(PORT, true);
server.registerUserProcessor(createCustomerProcessor);
server.registerUserProcessor(updateCustomerProcessor);
server.registerUserProcessor(deleteCustomerProcessor);
server.registerUserProcessor(selectCustomerProcessor);
server.startup();
RpcClient client = new RpcClient();
client.startup();
CreateCustomerReq createCustomerReq = new CreateCustomerReq();
createCustomerReq.setCustomerName("throwable.club");
CreateCustomerResp createCustomerResp = (CreateCustomerResp)
client.invokeSync(ADDRESS, createCustomerReq, 5000);
System.out.println(Create user [throwable. Club] result: + createCustomerResp);
SelectCustomerReq selectCustomerReq = new SelectCustomerReq();
selectCustomerReq.setCustomerId(createCustomerResp.getCustomerId());
SelectCustomerResp selectCustomerResp = (SelectCustomerResp)
client.invokeSync(ADDRESS, selectCustomerReq, 5000);
System.out.println(String.format(Query user [id=%d] result :%s", selectCustomerReq.getCustomerId(),
selectCustomerResp));
UpdateCustomerReq updateCustomerReq = new UpdateCustomerReq();
updateCustomerReq.setCustomerId(selectCustomerReq.getCustomerId());
updateCustomerReq.setCustomerName("throwx.cn");
UpdateCustomerResp updateCustomerResp = (UpdateCustomerResp)
client.invokeSync(ADDRESS, updateCustomerReq, 5000);
System.out.println(String.format(Update user [id=%d] result :%s, updateCustomerReq.getCustomerId(),
updateCustomerResp));
selectCustomerReq.setCustomerId(updateCustomerReq.getCustomerId());
selectCustomerResp = (SelectCustomerResp)
client.invokeSync(ADDRESS, selectCustomerReq, 5000);
System.out.println(String.format(Select * from user [id=%d] where id=% s, selectCustomerReq.getCustomerId(),
selectCustomerResp));
DeleteCustomerReq deleteCustomerReq = new DeleteCustomerReq();
deleteCustomerReq.setCustomerId(selectCustomerResp.getCustomerId());
DeleteCustomerResp deleteCustomerResp = (DeleteCustomerResp)
client.invokeSync(ADDRESS, deleteCustomerReq, 5000);
System.out.println(String.format(Delete user [id=%d] result :%s", deleteCustomerReq.getCustomerId(), deleteCustomerResp)); }}Copy the code
The execution result is as follows:
SelectCustomerResp(code=0, customerId=1) SelectCustomerResp(code=0, customerId=1) SelectCustomerResp(code=0) SelectCustomerResp(code=0) SelectCustomerResp(code=0) CustomerId =1, customerName=throwx.cn) update user [id=1]Copy the code
Verify that the database table is empty after the final deletion.
Write SOFA-BOLT protocol client based on GO language
This is an attempt to use GO language to write a SOFA-BOLT protocol client. Considering that the implementation of a complete version will be more complex, this is simplified to only implement Encode and command invocation parts, without handling the response and Decode temporarily. Write the RequestCommand structure as follows:
// RequestCommand sofa-bolt v2 req cmd
type RequestCommand struct {
ProtocolCode uint8
ProtocolVersion uint8
Type uint8
CommandCode uint16
CommandVersion uint8
RequestId uint32
Codec uint8
Switch uint8
Timeout uint32
ClassLength uint16
HeaderLength uint16
ContentLength uint32
ClassName []byte
Header []byte
Content []byte
}
Copy the code
Note that all integer types must use a specific type. For example, uint must use uint32. Otherwise, there will be an exception to write Buffer. Then write a code method:
// encode req => slice
func encode(cmd *RequestCommand) []byte {
container := make([]byte.0)
buf := bytes.NewBuffer(container)
buf.WriteByte(cmd.ProtocolCode)
buf.WriteByte(cmd.ProtocolVersion)
buf.WriteByte(cmd.Type)
binary.Write(buf, binary.BigEndian, cmd.CommandCode)
buf.WriteByte(cmd.CommandVersion)
binary.Write(buf, binary.BigEndian, cmd.RequestId)
buf.WriteByte(cmd.Codec)
buf.WriteByte(cmd.Switch)
binary.Write(buf, binary.BigEndian, cmd.Timeout)
binary.Write(buf, binary.BigEndian, cmd.ClassLength)
binary.Write(buf, binary.BigEndian, cmd.HeaderLength)
binary.Write(buf, binary.BigEndian, cmd.ContentLength)
buf.Write(cmd.ClassName)
buf.Write(cmd.Header)
buf.Write(cmd.Content)
return buf.Bytes()
}
Copy the code
Finally, write the TCP client:
type Req struct {
Id int64 `json:"id"`
Name string `json:"name"`
}
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"net"
)
func main(a) {
con, err := net.Dial("tcp"."127.0.0.1:9999")
iferr ! =nil {
fmt.Println("err:", err)
return
}
defer con.Close()
req := &Req{
Id: 8080,
Name: "throwx.cn",
}
content, err := json.Marshal(req)
iferr ! =nil {
fmt.Println("err:", err)
return
}
var header []byte
className := []byte("com.alipay.remoting.Req")
cmd := &RequestCommand{
ProtocolCode: 2,
ProtocolVersion: 2,
Type: 1,
CommandCode: 1,
CommandVersion: 1,
RequestId: 10087,
Codec: 1,
Switch: 0,
Timeout: 5000,
ClassLength: uint16(len(className)),
HeaderLength: 0,
ContentLength: uint32(len(content)),
ClassName: className,
Header: header,
Content: content,
}
pkg := encode(cmd)
_, err = con.Write(pkg)
iferr ! =nil {
fmt.Println("err:", err)
return}}Copy the code
The Crc32 attribute for the V2 version of the protocol is optional and is ignored here for the sake of processing simplicity
In order to simplify processing, JSON is used for serialization, so we need to change the source code of SOFA-BOLT slightly, introduce FastJson and FastJsonSerializer, as shown in the following figure:
First start BoltApp (SOFA-BOLT server), then execute the GO written client, the result is as follows:
summary
SOFA-BOLT is a high performance mature and extensible Netty proprietary protocol encapsulation, compared to native Netty programming, provides convenient synchronous, asynchronous calls, basic heartbeat support and reconnection features. SyncUserProcessor and AsyncUserProcessor are introduced to facilitate service development. The SOFA-BOLT protocol is also a compact and high performance RPC protocol. When considering the introduction of Netty for low-level communication, THE USE of SOFA-BOLT can be given priority or considered as one of the candidate solutions, because SOFA-BOLT is lightweight, has a gentle learning curve, and basically has no other middleware dependencies.
Demo warehouse:
- framework-mesh/sofa-bolt-demo
(C-5-D E-A-20210806)