preface
In this final article in the series, we will implement an HTTP Gateway for our RPC framework. This functionality is actually inspired by RPCX, which implements a simple sidecar similar to the Service Mesh. Code address: Github
The principle of
The HTTP Gateway can receive AN HTTP request from a client, convert it into an RPC request and send it to the server for processing, and then send the server’s processing result back to the client through the HTTP response.
The general principle of the HTTP Gateway is to put the header part of our RPC protocol into the HTTP header and the body part of the RPC protocol into the HTTP Body.
implementation
First we need to define the names of the fields in the HTTP header:
const (
HEADER_SEQ = "rpc-header-seq" // Serial number, which uniquely identifies the request or response
HEADER_MESSAGE_TYPE = "rpc-header-message_type" // Message type, which identifies whether a message is a request or a response
HEADER_COMPRESS_TYPE = "rpc-header-compress_type" // Compression type, which identifies how a message is compressed
HEADER_SERIALIZE_TYPE = "rpc-header-serialize_type" // Serialization type, which identifies how the message body is encoded
HEADER_STATUS_CODE = "rpc-header-status_code" // Status type, which identifies whether a request is normal or abnormal
HEADER_SERVICE_NAME = "rpc-header-service_name" / / service name
HEADER_METHOD_NAME = "rpc-header-method_name" / / the method name
HEADER_ERROR = "rpc-header-error" // The exception occurred on the method call
HEADER_META_DATA = "rpc-header-meta_data" // Other metadata
)
Copy the code
Then we need to start an HTTP Server to receive HTTP requests. Here we use the API that comes with GO. By default, port 5080 is used. If the port is already occupied, increment the port.
func (s *SGServer) startGateway(a) {
port := 5080
ln, err := net.Listen("tcp".":" + strconv.Itoa(port))
forerr ! =nil && strings.Contains(err.Error(), "address already in use") {
port++
ln, err = net.Listen("tcp".":" + strconv.Itoa(port))
}
iferr ! =nil {
log.Printf("error listening gateway: %s", err.Error())
}
log.Printf("gateway listenning on " + strconv.Itoa(port))
// Avoid blocking and use a new Goroutine to execute the HTTP server
go func(a) {
err := http.Serve(ln, s)
iferr ! =nil {
log.Printf("error serving http %s", err.Error())
}
}()
}
Copy the code
Next we need to implement the ServeHTTP function:
func (s *SGServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
// If the URL is incorrect, return it directly
ifr.URL.Path ! ="/invoke" {
rw.WriteHeader(404)
return
}
// If method is not correct, return it directly
ifr.Method ! ="POST" {
rw.WriteHeader(405)
return
}
// Construct a new request
request := protocol.NewMessage(s.Option.ProtocolType)
// Populate the request header with HTTP headers
request, err := parseHeader(request, r)
iferr ! =nil {
rw.WriteHeader(400)}// Populate the request data according to the HTTP body
request, err = parseBody(request, r)
iferr ! =nil {
rw.WriteHeader(400)}/ / the context construction
ctx := metadata.WithMeta(context.Background(), request.MetaData)
response := request.Clone()
response.MessageType = protocol.MessageTypeResponse
// Process the request
response = s.process(ctx, request, response)
// return corresponding
s.writeHttpResponse(response, rw, r)
}
func parseBody(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
data, err := ioutil.ReadAll(request.Body)
iferr ! =nil {
return nil, err
}
message.Data = data
return message, nil
}
func parseHeader(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
headerSeq := request.Header.Get(HEADER_SEQ)
seq, err := strconv.ParseUint(headerSeq, 10.64)
iferr ! =nil {
return nil, err
}
message.Seq = seq
headerMsgType := request.Header.Get(HEADER_MESSAGE_TYPE)
msgType, err := protocol.ParseMessageType(headerMsgType)
iferr ! =nil {
return nil, err
}
message.MessageType = msgType
headerCompressType := request.Header.Get(HEADER_COMPRESS_TYPE)
compressType, err := protocol.ParseCompressType(headerCompressType)
iferr ! =nil {
return nil. err } message.CompressType = compressType headerSerializeType := request.Header.Get(HEADER_SERIALIZE_TYPE) serializeType, err := codec.ParseSerializeType(headerSerializeType)iferr ! =nil {
return nil, err
}
message.SerializeType = serializeType
headerStatusCode := request.Header.Get(HEADER_STATUS_CODE)
statusCode, err := protocol.ParseStatusCode(headerStatusCode)
iferr ! =nil {
return nil. err } message.StatusCode = statusCode serviceName := request.Header.Get(HEADER_SERVICE_NAME) message.ServiceName = serviceName methodName := request.Header.Get(HEADER_METHOD_NAME) message.MethodName = methodName errorMsg := request.Header.Get(HEADER_ERROR) message.Error = errorMsg headerMeta := request.Header.Get(HEADER_META_DATA) meta :=make(map[string]interface{})
err = json.Unmarshal([]byte(headerMeta), &meta)
iferr ! =nil {
return nil, err
}
message.MetaData = meta
return message, nil
}
func (s *SGServer) writeHttpResponse(message *protocol.Message, rw http.ResponseWriter, r *http.Request) {
header := rw.Header()
header.Set(HEADER_SEQ, string(message.Seq))
header.Set(HEADER_MESSAGE_TYPE, message.MessageType.String())
header.Set(HEADER_COMPRESS_TYPE, message.CompressType.String())
header.Set(HEADER_SERIALIZE_TYPE, message.SerializeType.String())
header.Set(HEADER_STATUS_CODE, message.StatusCode.String())
header.Set(HEADER_SERVICE_NAME, message.ServiceName)
header.Set(HEADER_METHOD_NAME, message.MethodName)
header.Set(HEADER_ERROR, message.Error)
metaDataJson, _ := json.Marshal(message.MetaData)
header.Set(HEADER_META_DATA, string(metaDataJson))
_, _ = rw.Write(message.Data)
}
Copy the code
Finally we just need to start the HTTP Server in the Wrapper.
func (w *DefaultServerWrapper) WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc {
return func(network string, addr string, meta map[string]interface{}) error {
// omit the preceding part./ / start the gateway
s.startGateway()
return serveFunc(network, addr, meta)
}
}
Copy the code
Client test code:
func MakeHttpCall(a) {
// Declare the parameters and serialize them in the body of the HTTP request
arg := service.Args{A: rand.Intn(200), B: rand.Intn(100)}
data, _ := msgpack.Marshal(arg)
body := bytes.NewBuffer(data)
req, err := http.NewRequest("POST"."http://localhost:5080/invoke", body)
iferr ! =nil {
log.Println(err)
return
}
req.Header.Set(server.HEADER_SEQ, "1")
req.Header.Set(server.HEADER_MESSAGE_TYPE, protocol.MessageTypeRequest.String())
req.Header.Set(server.HEADER_COMPRESS_TYPE,protocol.CompressTypeNone.String())
req.Header.Set(server.HEADER_SERIALIZE_TYPE,codec.MessagePack.String())
req.Header.Set(server.HEADER_STATUS_CODE,protocol.StatusOK.String())
req.Header.Set(server.HEADER_SERVICE_NAME,"Arith")
req.Header.Set(server.HEADER_METHOD_NAME,"Add")
req.Header.Set(server.HEADER_ERROR,"")
meta := map[string]interface{} {"key":"value"}
metaJson, _ := json.Marshal(meta)
req.Header.Set(server.HEADER_META_DATA,string(metaJson))
response, err := http.DefaultClient.Do(req)
iferr ! =nil {
log.Println(err)
return
}
ifresponse.StatusCode ! =200 {
log.Println(response)
} else ifresponse.Header.Get(server.HEADER_ERROR) ! ="" {
log.Println(response.Header.Get(server.HEADER_ERROR))
} else {
data, err = ioutil.ReadAll(response.Body)
result := service.Reply{}
msgpack.Unmarshal(data, &result)
fmt.Println(result.C)
}
}
Copy the code
conclusion
This is the end of the series, but there is still a lot to improve, enrich and even error, to be updated as a separate article.
The historical link
Implementing an RPC framework from scratch (zero)
Implementing an RPC Framework from Scratch (PART 1)
Implementing an RPC Framework from Scratch (PART 2)
Implementing an RPC Framework from Scratch (part 3)
Implementing an RPC Framework from Scratch (4)