preface
RPC is short for Remote Procedure Call. Through RPC, we can Call functions in other locations just as we Call local methods. You’re probably more likely to see HTTP API calls, but in simple comparison, RPC is more wrapped than HTTP calls, callers don’t have to deal with serialization and deserialization manually, and it’s cheaper to use (although learning costs can be higher).
For learning purposes, the goal this time is to implement an RPC of your own using the GO language. In the real world, for an RPC tool, other functions such as service discovery, load balancing, and circuit breaker degradation are more important than method calls. I won’t cover that for now, but I will focus on implementing a working method call.
In the previous article, I had a general understanding of the RPC framework of go language, which mentioned that GO RPC reserved coDEC interface, allowing users to use their own serialization protocol in GO RPC. This time, I will try to implement a coDEC to achieve my own RPC.
The preparatory work
Serialization protocol
To implement an RPC, there are roughly the following basic elements: serialization protocol, network model, and threading model. The CODEC in GO RPC basically implements the serialization protocol.
I originally intended to use the familiar THRIFT protocol, but THRIFT itself implements RPC flow, so it is not a simple serialization protocol. Its serialization logic may not fit well with GO RPC, and IDL definition needs to be written, which increases the complexity. In order to become familiar with GO, I will start with the easy ones and choose Messagepack as the serialization protocol.
Messagepack is a relatively lightweight serialization protocol. Its logic is similar to JSON, but it uses binary format, so it is faster than JSON serialization, and the resulting data is smaller. It is basically considered a binary version of JSON.
Creating a class definition
To implement coDEC, two interfaces are provided in GO RPC: ServerCodec and ClientCodec. It is obvious that they represent the logic of the server side and the client side respectively. The two interfaces are defined as follows:
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(interface{}) error
WriteResponse(*Response, interface{}) error
Close() error
}
type ClientCodec interface {
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error
Close() error
}
Copy the code
As can be seen, go RPC abstracts a request/response into the form of header+body. When reading data, it can be divided into reading head and reading body. When writing data, only the body part is written, and go RPC adds the head part for us. Next we define two structures that represent the complete data of a request/response:
type MsgpackReq struct {
rpc.Request //head
Arg interface{} //body
}
type MsgpackResp struct {
rpc.Response //head
Reply interface{} //body
}
Copy the code
Here msgpackReq and msgpackResp are directly embedded with the built-in Request and Response in go RPC, which define the serial number, method name and other information.
Next comes the declaration of the custom Codec:
type MessagePackServerCodec struct {
rwc io.ReadWriteCloser // It is used to read and write data, which is actually a network connection
req MsgpackReq // Used to cache parsed requests
closed bool // Indicates whether coDEC is closed
}
type MessagePackClientCodec struct {
rwc io.ReadWriteCloser
resp MsgpackResp // Used to cache parsed requests
closed bool
}
func NewServerCodec(conn net.Conn) *MessagePackServerCodec {
return &MessagePackServerCodec{conn, MsgpackReq{}, false}}func NewClientCodec(conn net.Conn) *MessagePackClientCodec {
return &MessagePackClientCodec{conn, MsgpackResp{}, false}}Copy the code
As mentioned in the previous article, coDEC needs to include a data source for reading and writing data, which passes the network connection directly into it.
Implement the Codec method
Implementation approach
For simplicity, we combine the two steps of the deserialization part into one step, parse and cache all the data when the head part is read, and return the cached results when the body part is read. The specific idea is:
- When the client sends the request, it wraps the data into an MsgpackReq, which is serialized with Messagepack and sent out
- When the server reads the request head part, it deserializes the received data into an MsgpackReq using Messagepack, and caches the obtained results
- When the server reads the body part of the request, it retrieves the Arg field from the cached MsgpackReq and returns it
- When the server sends the response, it wraps the data into an MsgpackResp, serializes it with Messagepack and sends it out
- When the client reads the head part of the response, it deserializes the received data into an MsgpackResp using Messagepack and caches the result
- When the client reads the body part of the response, it retrieves the Reply or Error field from the cached MsgpackResp and returns it
The Client implementation
Here’s the code:
func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error {
// Determine if codec is closed and return if it is
if c.closed {
return nil
}
// Assemble r and ARG into an MsgpackReq and serialize it
request := &MsgpackReq{*r, arg}
reqData, err := msgpack.Marshal(request)
iferr ! =nil {
panic(err)
return err
}
// Send the data length first
head := make([]byte.4)
binary.BigEndian.PutUint32(head, uint32(len(reqData)))
_, err = c.rwc.Write(head)
// Send the serialized data
_, err = c.rwc.Write(reqData)
return err
}
func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error {
// Determine if codec is closed and return if it is
if c.closed {
return nil
}
// Read data
data, err := readData(c.rwc)
iferr ! =nil {
// As soon as the client is initialized, it starts polling data, so it has to deal with close connections
if strings.Contains(err.Error(), "use of closed network connection") {
return nil
}
panic(err) // If there is an exception, panic occurs
}
// Deserialize the read data into an MsgpackResp
var response MsgpackResp
err = msgpack.Unmarshal(data, &response)
iferr ! =nil {
panic(err) // If there is an exception, panic occurs
}
// Set the request properties based on the data read
r.ServiceMethod = response.ServiceMethod
r.Seq = response.Seq
// Cache the read data
c.resp = response
return nil
}
func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error {
// Return the cached data directly
if ""! = c.resp.Error {// If an exception is returned
return errors.New(c.resp.Error)
}
ifreply ! =nil {
// Return normally, and set the result to the reply variable by reflection. Since reply must be a pointer type, there is no need to check CanSet
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply))
}
return nil
}
func (c *MessagePackClientCodec) Close(a) error {
c.closed = true // Set closed to true when closing
ifc.rwc ! =nil {
return c.rwc.Close()
}
return nil
}
Copy the code
Above is the implementation of the client part, it is worth noting that there are several points:
- Before reading or writing data, you need to check that coDEC has been turned off
- Reading and writing data requires unpacking and gluing (handled by the readData function)
Server implementation
Also directly on the code:
func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error {
// Determine if codec is closed and return if it is
if c.closed {
return nil
}
// Assemble r and Reply into an MsgpackResp and serialize it
response := &MsgpackResp{*r, reply}
respData, err := msgpack.Marshal(response)
iferr ! =nil {
panic(err)
return err
}
head := make([]byte.4)
binary.BigEndian.PutUint32(head, uint32(len(respData)))
_, err = c.rwc.Write(head)
// Send the serialized data
_, err = c.rwc.Write(respData)
return err
}
func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error {
// Determine if codec is closed and return if it is
if c.closed {
return nil
}
// Read data
data, err := readData(c.rwc)
iferr ! =nil {
// We can't panic directly, we need to handle EOF and reset
if err == io.EOF {
return err
}
if strings.Contains(err.Error(), "connection reset by peer") {
return err
}
panic(err) // Other exceptions directly panic
}
// Deserialize the read data into a MsgpackReq
var request MsgpackReq
err = msgpack.Unmarshal(data, &request)
iferr ! =nil {
panic(err) // If there is an exception, panic occurs
}
// Set the request properties based on the data read
r.ServiceMethod = request.ServiceMethod
r.Seq = request.Seq
// Cache the parsed data
c.req = request
return nil
}
func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error {
ifarg ! =nil {
// The argument is not nil, and the result is set to the arG variable by reflection
reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg))
}
return nil
}
func (c *MessagePackServerCodec) Close(a) error {
c.closed = true
ifc.rwc ! =nil {
return c.rwc.Close()
}
return nil
}
Copy the code
In fact, the server-side implementation is almost the same as the client-side logic, except that the roles of request and response are different. There are a few things to note:
- The server needs to deal with EOF and connection reset when reading data
- The server does not explicitly handle the error generated by the interface when it returns data. Instead, it passes reply back because the error is stored in rpc.request and not handled by CODEC
Deal with unpacking and sticking
For specific ideas, refer to the GO language to deal with TCP unpacking/sticky packet, and the implementation of readData is attached here:
func readData(conn io.ReadWriteCloser) (data []byte, returnError error) {
const HeadSize = 4 // Set the length to 4 bytes
headBuf := bytes.NewBuffer(make([]byte.0, HeadSize))
headData := make([]byte, HeadSize)
for {
readSize, err := conn.Read(headData)
iferr ! =nil {
returnError = err
return
}
headBuf.Write(headData[0:readSize])
if headBuf.Len() == HeadSize {
break
} else {
headData = make([]byte, HeadSize-readSize)
}
}
bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes()))
bodyBuf := bytes.NewBuffer(make([]byte.0, bodyLen))
bodyData := make([]byte, bodyLen)
for {
readSize, err := conn.Read(bodyData)
iferr ! =nil {
returnError = err
return
}
bodyBuf.Write(bodyData[0:readSize])
if bodyBuf.Len() == bodyLen {
break
} else {
bodyData = make([]byte, bodyLen-readSize)
}
}
data = bodyBuf.Bytes()
returnError = nil
return
}
Copy the code
The test code
Let’s test our codec with a simple Echo call:
// Declare the interface class
type EchoService struct {}
// Define the method Echo
func (service *EchoService) Echo(arg string, result *string) error {
*result = arg
return nil
}
// Start the server logic
func RegisterAndServeOnTcp(a) {
err := rpc.Register(&EchoService{})// Registration is not a registration method, but an instance of EchoService
iferr ! =nil {
log.Fatal("error registering", err)
return
}
tcpAddr, err := net.ResolveTCPAddr("tcp".": 1234")
iferr ! =nil {
log.Fatal("error resolving tcp", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
for {
conn, err := listener.Accept()
iferr ! =nil {
log.Fatal("error accepting", err)
} else {
// Get an instance with NewServerCodec and call rpc.servecodec to start the service
rpc.ServeCodec(msgpk.NewServerCodec(conn))
}
}
}
// The client invokes logic
func Echo(arg string) (result string, err error) {
var client *rpc.Client
conn, err := net.Dial("tcp".": 1234")
client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn))
defer client.Close()
iferr ! =nil {
return "", err
}
err = client.Call("EchoService.Echo", arg, &result) // Specify the method to call by type plus the method name
iferr ! =nil {
return "", err
}
return result, err
}
/ / the main function
func main(a) {
go server.RegisterAndServeOnTcp() // Start the server
time.Sleep(1e9)
wg := new(sync.WaitGroup) //waitGroup is used to block the main thread to prevent premature exit
callTimes := 10
wg.Add(callTimes)
for i := 0; i < callTimes; i++ {
go func(a) {
// Use hello world with a random number as an argument
argString := "hello world "+strconv.Itoa(rand.Int())
resultString, err := client.Echo(argString)
iferr ! =nil {
log.Fatal("error calling:", err)
}
ifresultString ! = argString { fmt.Println("error")}else {
fmt.Printf("echo:%s\n", resultString)
}
wg.Done()
}()
}
wg.Wait()
}
Copy the code
Above the case, first of all, go through the server. RegisterAndServeOnTcp () launched the service side, and at the same time launched 10 go routine to initiate the request, the client after receiving the response can print the corresponding results. Finally, execute main, and the console prints the result (random numbers may vary later) :
echo:hello world 8674665223082153551
echo:hello world 6129484611666145821
echo:hello world 5577006791947779410
echo:hello world 605394647632969758
echo:hello world 4037200794235010051
echo:hello world 3916589616287113937
echo:hello world 894385949183117216
echo:hello world 1443635317331776148
echo:hello world 2775422040480279449
echo:hello world 6334824724549167320
Copy the code
conclusion
At this point, a simple custom GO language RPC is complete, although only the serialization protocol part of the custom part, such as the thread model is still go RPC’s own logic, in addition to the various advanced features mentioned in the introduction. Consider later trying to implement an RPC from scratch in go.
other
Concurrent scenarios
As you may have noticed, the logic implemented here does not consider concurrency at all, and the cached data is placed directly into codec objects. This simple implementation does not cause concurrent calls to fail, because the GO RPC reads requests sequentially for each COdec object, and then concurrently processes the requests and returns the results.