preface

RPC is short for Remote Procedure Call. Through RPC, we can Call functions in other locations just as we Call local methods. You’re probably more likely to see HTTP API calls, but in simple comparison, RPC is more wrapped than HTTP calls, callers don’t have to deal with serialization and deserialization manually, and it’s cheaper to use (although learning costs can be higher).

For learning purposes, the goal this time is to implement an RPC of your own using the GO language. In the real world, for an RPC tool, other functions such as service discovery, load balancing, and circuit breaker degradation are more important than method calls. I won’t cover that for now, but I will focus on implementing a working method call.

In the previous article, I had a general understanding of the RPC framework of go language, which mentioned that GO RPC reserved coDEC interface, allowing users to use their own serialization protocol in GO RPC. This time, I will try to implement a coDEC to achieve my own RPC.

The preparatory work

Serialization protocol

To implement an RPC, there are roughly the following basic elements: serialization protocol, network model, and threading model. The CODEC in GO RPC basically implements the serialization protocol.

I originally intended to use the familiar THRIFT protocol, but THRIFT itself implements RPC flow, so it is not a simple serialization protocol. Its serialization logic may not fit well with GO RPC, and IDL definition needs to be written, which increases the complexity. In order to become familiar with GO, I will start with the easy ones and choose Messagepack as the serialization protocol.

Messagepack is a relatively lightweight serialization protocol. Its logic is similar to JSON, but it uses binary format, so it is faster than JSON serialization, and the resulting data is smaller. It is basically considered a binary version of JSON.

Creating a class definition

To implement coDEC, two interfaces are provided in GO RPC: ServerCodec and ClientCodec. It is obvious that they represent the logic of the server side and the client side respectively. The two interfaces are defined as follows:

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(interface{}) error
	WriteResponse(*Response, interface{}) error
	Close() error
}
type ClientCodec interface {
	WriteRequest(*Request, interface{}) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(interface{}) error
	Close() error
}
Copy the code

As can be seen, go RPC abstracts a request/response into the form of header+body. When reading data, it can be divided into reading head and reading body. When writing data, only the body part is written, and go RPC adds the head part for us. Next we define two structures that represent the complete data of a request/response:

type MsgpackReq struct {
	rpc.Request  //head
	Arg interface{} //body
}

type MsgpackResp struct {
	rpc.Response  //head
	Reply interface{}  //body
}
Copy the code

Here msgpackReq and msgpackResp are directly embedded with the built-in Request and Response in go RPC, which define the serial number, method name and other information.

Next comes the declaration of the custom Codec:

type MessagePackServerCodec struct {
	rwc    io.ReadWriteCloser // It is used to read and write data, which is actually a network connection
	req    MsgpackReq // Used to cache parsed requests
	closed bool  // Indicates whether coDEC is closed
}

type MessagePackClientCodec struct {
	rwc    io.ReadWriteCloser
	resp   MsgpackResp  // Used to cache parsed requests
	closed bool
}

func NewServerCodec(conn net.Conn) *MessagePackServerCodec {
	return &MessagePackServerCodec{conn, MsgpackReq{}, false}}func NewClientCodec(conn net.Conn) *MessagePackClientCodec {
	return &MessagePackClientCodec{conn, MsgpackResp{}, false}}Copy the code

As mentioned in the previous article, coDEC needs to include a data source for reading and writing data, which passes the network connection directly into it.

Implement the Codec method

Implementation approach

For simplicity, we combine the two steps of the deserialization part into one step, parse and cache all the data when the head part is read, and return the cached results when the body part is read. The specific idea is:

  1. When the client sends the request, it wraps the data into an MsgpackReq, which is serialized with Messagepack and sent out
  2. When the server reads the request head part, it deserializes the received data into an MsgpackReq using Messagepack, and caches the obtained results
  3. When the server reads the body part of the request, it retrieves the Arg field from the cached MsgpackReq and returns it
  4. When the server sends the response, it wraps the data into an MsgpackResp, serializes it with Messagepack and sends it out
  5. When the client reads the head part of the response, it deserializes the received data into an MsgpackResp using Messagepack and caches the result
  6. When the client reads the body part of the response, it retrieves the Reply or Error field from the cached MsgpackResp and returns it

The Client implementation

Here’s the code:

func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error {
	// Determine if codec is closed and return if it is
	if c.closed {
		return nil
	}
	// Assemble r and ARG into an MsgpackReq and serialize it
	request := &MsgpackReq{*r, arg}
	reqData, err := msgpack.Marshal(request)
	iferr ! =nil {
		panic(err)
		return err
	}
	// Send the data length first
	head := make([]byte.4)
	binary.BigEndian.PutUint32(head, uint32(len(reqData)))
	_, err = c.rwc.Write(head)
	// Send the serialized data
	_, err = c.rwc.Write(reqData)
	return err
}

func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error {
	// Determine if codec is closed and return if it is
	if c.closed {
		return nil
	}
	// Read data
	data, err := readData(c.rwc)
	iferr ! =nil {
		// As soon as the client is initialized, it starts polling data, so it has to deal with close connections
		if strings.Contains(err.Error(), "use of closed network connection") {
			return nil
		}
		panic(err) // If there is an exception, panic occurs
	}

	// Deserialize the read data into an MsgpackResp
	var response MsgpackResp
	err = msgpack.Unmarshal(data, &response)

	iferr ! =nil {
		panic(err) // If there is an exception, panic occurs
	}

	// Set the request properties based on the data read
	r.ServiceMethod = response.ServiceMethod
	r.Seq = response.Seq
	// Cache the read data
	c.resp = response

	return nil
}

func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error {
	// Return the cached data directly

	if ""! = c.resp.Error {// If an exception is returned
		return errors.New(c.resp.Error)
	}
	ifreply ! =nil {
		// Return normally, and set the result to the reply variable by reflection. Since reply must be a pointer type, there is no need to check CanSet
		reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply))
	}
	return nil
}


func (c *MessagePackClientCodec) Close(a) error {
	c.closed = true // Set closed to true when closing
	ifc.rwc ! =nil {
		return c.rwc.Close()
	}
	return nil
}
Copy the code

Above is the implementation of the client part, it is worth noting that there are several points:

  1. Before reading or writing data, you need to check that coDEC has been turned off
  2. Reading and writing data requires unpacking and gluing (handled by the readData function)

Server implementation

Also directly on the code:

func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error {
	// Determine if codec is closed and return if it is
	if c.closed {
		return nil
	}
	// Assemble r and Reply into an MsgpackResp and serialize it
	response := &MsgpackResp{*r, reply}

	respData, err := msgpack.Marshal(response)
	iferr ! =nil {
		panic(err)
		return err
	}
	head := make([]byte.4)
	binary.BigEndian.PutUint32(head, uint32(len(respData)))
	_, err = c.rwc.Write(head)
	// Send the serialized data
	_, err = c.rwc.Write(respData)
	return err
}

func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error {
	// Determine if codec is closed and return if it is
	if c.closed {
		return nil
	}
	// Read data
	data, err := readData(c.rwc)
	iferr ! =nil {
		// We can't panic directly, we need to handle EOF and reset
		if err == io.EOF {
			return err
		}
		if strings.Contains(err.Error(), "connection reset by peer") {
			return err
		}
		panic(err) // Other exceptions directly panic
	}
	// Deserialize the read data into a MsgpackReq
	var request MsgpackReq
	err = msgpack.Unmarshal(data, &request)

	iferr ! =nil {
		panic(err) // If there is an exception, panic occurs
	}

	// Set the request properties based on the data read
	r.ServiceMethod = request.ServiceMethod
	r.Seq = request.Seq
	// Cache the parsed data
	c.req = request

	return nil
}

func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error {
	ifarg ! =nil {
		// The argument is not nil, and the result is set to the arG variable by reflection
		reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg))
	}
	return nil
}

func (c *MessagePackServerCodec) Close(a) error {
	c.closed = true
	ifc.rwc ! =nil {
		return c.rwc.Close()
	}
	return nil
}
Copy the code

In fact, the server-side implementation is almost the same as the client-side logic, except that the roles of request and response are different. There are a few things to note:

  1. The server needs to deal with EOF and connection reset when reading data
  2. The server does not explicitly handle the error generated by the interface when it returns data. Instead, it passes reply back because the error is stored in rpc.request and not handled by CODEC

Deal with unpacking and sticking

For specific ideas, refer to the GO language to deal with TCP unpacking/sticky packet, and the implementation of readData is attached here:

func readData(conn io.ReadWriteCloser) (data []byte, returnError error) {
	const HeadSize = 4 // Set the length to 4 bytes
	headBuf := bytes.NewBuffer(make([]byte.0, HeadSize))
	headData := make([]byte, HeadSize)
	for {
		readSize, err := conn.Read(headData)
		iferr ! =nil {
			returnError = err
			return
		}
		headBuf.Write(headData[0:readSize])
		if headBuf.Len() == HeadSize {
			break
		} else {
			headData = make([]byte, HeadSize-readSize)
		}
	}
	bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes()))
	bodyBuf := bytes.NewBuffer(make([]byte.0, bodyLen))
	bodyData := make([]byte, bodyLen)
	for {
		readSize, err := conn.Read(bodyData)
		iferr ! =nil {
			returnError = err
			return
		}
		bodyBuf.Write(bodyData[0:readSize])
		if bodyBuf.Len() == bodyLen {
			break
		} else {
			bodyData = make([]byte, bodyLen-readSize)
		}
	}
	data = bodyBuf.Bytes()
	returnError = nil
	return
}
Copy the code

The test code

Let’s test our codec with a simple Echo call:

// Declare the interface class
type EchoService struct {}
// Define the method Echo
func (service *EchoService) Echo(arg string, result *string) error {
	*result = arg
	return nil
}
// Start the server logic
func RegisterAndServeOnTcp(a) {
	err := rpc.Register(&EchoService{})// Registration is not a registration method, but an instance of EchoService
	iferr ! =nil {
		log.Fatal("error registering", err)
		return
	}
	tcpAddr, err := net.ResolveTCPAddr("tcp".": 1234")
	iferr ! =nil {
		log.Fatal("error resolving tcp", err)
	}
	listener, err := net.ListenTCP("tcp", tcpAddr)

	for {
		conn, err := listener.Accept()
		iferr ! =nil {
			log.Fatal("error accepting", err)
		} else {
			// Get an instance with NewServerCodec and call rpc.servecodec to start the service
			rpc.ServeCodec(msgpk.NewServerCodec(conn))
		}
	}
}
// The client invokes logic
func Echo(arg string) (result string, err error) {
	var client *rpc.Client
	conn, err := net.Dial("tcp".": 1234")
	client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn))

	defer client.Close()

	iferr ! =nil {
		return "", err
	}
	err = client.Call("EchoService.Echo", arg, &result) // Specify the method to call by type plus the method name
	iferr ! =nil {
		return "", err
	}
	return result, err
}
/ / the main function
func main(a) {
	go server.RegisterAndServeOnTcp() // Start the server
	time.Sleep(1e9)
	wg := new(sync.WaitGroup) //waitGroup is used to block the main thread to prevent premature exit
	callTimes := 10
	wg.Add(callTimes)
	for i := 0; i < callTimes; i++ {
		go func(a) {
		        // Use hello world with a random number as an argument
			argString := "hello world "+strconv.Itoa(rand.Int())
			resultString, err := client.Echo(argString)
			iferr ! =nil {
				log.Fatal("error calling:", err)
			}
			ifresultString ! = argString { fmt.Println("error")}else {
				fmt.Printf("echo:%s\n", resultString)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}
Copy the code

Above the case, first of all, go through the server. RegisterAndServeOnTcp () launched the service side, and at the same time launched 10 go routine to initiate the request, the client after receiving the response can print the corresponding results. Finally, execute main, and the console prints the result (random numbers may vary later) :

echo:hello world 8674665223082153551
echo:hello world 6129484611666145821
echo:hello world 5577006791947779410
echo:hello world 605394647632969758
echo:hello world 4037200794235010051
echo:hello world 3916589616287113937
echo:hello world 894385949183117216
echo:hello world 1443635317331776148
echo:hello world 2775422040480279449
echo:hello world 6334824724549167320
Copy the code

conclusion

At this point, a simple custom GO language RPC is complete, although only the serialization protocol part of the custom part, such as the thread model is still go RPC’s own logic, in addition to the various advanced features mentioned in the introduction. Consider later trying to implement an RPC from scratch in go.

other

Concurrent scenarios

As you may have noticed, the logic implemented here does not consider concurrency at all, and the cached data is placed directly into codec objects. This simple implementation does not cause concurrent calls to fail, because the GO RPC reads requests sequentially for each COdec object, and then concurrently processes the requests and returns the results.