preface

In this final article in the series, we will implement an HTTP Gateway for our RPC framework. This functionality is actually inspired by RPCX, which implements a simple sidecar similar to the Service Mesh. Code address: Github

The principle of

The HTTP Gateway can receive AN HTTP request from a client, convert it into an RPC request and send it to the server for processing, and then send the server’s processing result back to the client through the HTTP response.

The general principle of the HTTP Gateway is to put the header part of our RPC protocol into the HTTP header and the body part of the RPC protocol into the HTTP Body.

implementation

First we need to define the names of the fields in the HTTP header:

const (
	HEADER_SEQ            = "rpc-header-seq"            // Serial number, which uniquely identifies the request or response
	HEADER_MESSAGE_TYPE   = "rpc-header-message_type"   // Message type, which identifies whether a message is a request or a response
	HEADER_COMPRESS_TYPE  = "rpc-header-compress_type"  // Compression type, which identifies how a message is compressed
	HEADER_SERIALIZE_TYPE = "rpc-header-serialize_type" // Serialization type, which identifies how the message body is encoded
	HEADER_STATUS_CODE    = "rpc-header-status_code"    // Status type, which identifies whether a request is normal or abnormal
	HEADER_SERVICE_NAME   = "rpc-header-service_name"   / / service name
	HEADER_METHOD_NAME    = "rpc-header-method_name"    / / the method name
	HEADER_ERROR          = "rpc-header-error"          // The exception occurred on the method call
	HEADER_META_DATA      = "rpc-header-meta_data"      // Other metadata
)
Copy the code

Then we need to start an HTTP Server to receive HTTP requests. Here we use the API that comes with GO. By default, port 5080 is used. If the port is already occupied, increment the port.

func (s *SGServer) startGateway(a) {
	port := 5080
	ln, err := net.Listen("tcp".":" + strconv.Itoa(port))
	forerr ! =nil && strings.Contains(err.Error(), "address already in use") {
		port++
		ln, err = net.Listen("tcp".":" + strconv.Itoa(port))
	}
	iferr ! =nil {
		log.Printf("error listening gateway: %s", err.Error())
	}
	log.Printf("gateway listenning on " + strconv.Itoa(port))
	// Avoid blocking and use a new Goroutine to execute the HTTP server
	go func(a) {
		err := http.Serve(ln, s)
		iferr ! =nil {
			log.Printf("error serving http %s", err.Error())
		}
	}()
}
Copy the code

Next we need to implement the ServeHTTP function:

func (s *SGServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
         // If the URL is incorrect, return it directly
	ifr.URL.Path ! ="/invoke" { 
		rw.WriteHeader(404)
		return
	}
	// If method is not correct, return it directly
	ifr.Method ! ="POST" {
		rw.WriteHeader(405)
		return
	}
	// Construct a new request
	request := protocol.NewMessage(s.Option.ProtocolType)
	// Populate the request header with HTTP headers
	request, err := parseHeader(request, r)
	iferr ! =nil {
	    rw.WriteHeader(400)}// Populate the request data according to the HTTP body
	request, err = parseBody(request, r)
	iferr ! =nil {
	    rw.WriteHeader(400)}/ / the context construction
	ctx := metadata.WithMeta(context.Background(), request.MetaData)
	response := request.Clone()
	response.MessageType = protocol.MessageTypeResponse
	// Process the request
	response = s.process(ctx, request, response)
	// return corresponding
	s.writeHttpResponse(response, rw, r)
}

func parseBody(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
	data, err := ioutil.ReadAll(request.Body)
	iferr ! =nil {
		return nil, err
	}
	message.Data = data
	return message, nil
}

func parseHeader(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
	headerSeq := request.Header.Get(HEADER_SEQ)
	seq, err := strconv.ParseUint(headerSeq, 10.64)
	iferr ! =nil {
		return nil, err
	}
	message.Seq = seq

	headerMsgType := request.Header.Get(HEADER_MESSAGE_TYPE)
	msgType, err := protocol.ParseMessageType(headerMsgType)
	iferr ! =nil {
		return nil, err
	}
	message.MessageType = msgType

	headerCompressType := request.Header.Get(HEADER_COMPRESS_TYPE)
	compressType, err := protocol.ParseCompressType(headerCompressType)
	iferr ! =nil {
		return nil. err } message.CompressType = compressType headerSerializeType := request.Header.Get(HEADER_SERIALIZE_TYPE) serializeType, err := codec.ParseSerializeType(headerSerializeType)iferr ! =nil {
		return nil, err
	}
	message.SerializeType = serializeType

	headerStatusCode := request.Header.Get(HEADER_STATUS_CODE)
	statusCode, err := protocol.ParseStatusCode(headerStatusCode)
	iferr ! =nil {
		return nil. err } message.StatusCode = statusCode serviceName := request.Header.Get(HEADER_SERVICE_NAME) message.ServiceName = serviceName methodName := request.Header.Get(HEADER_METHOD_NAME) message.MethodName = methodName errorMsg := request.Header.Get(HEADER_ERROR) message.Error = errorMsg headerMeta := request.Header.Get(HEADER_META_DATA) meta :=make(map[string]interface{})
	err = json.Unmarshal([]byte(headerMeta), &meta)
	iferr ! =nil {
		return nil, err
	}
	message.MetaData = meta

	return message, nil
}

func (s *SGServer) writeHttpResponse(message *protocol.Message, rw http.ResponseWriter, r *http.Request) {
	header := rw.Header()
	header.Set(HEADER_SEQ, string(message.Seq))
	header.Set(HEADER_MESSAGE_TYPE, message.MessageType.String())
	header.Set(HEADER_COMPRESS_TYPE, message.CompressType.String())
	header.Set(HEADER_SERIALIZE_TYPE, message.SerializeType.String())
	header.Set(HEADER_STATUS_CODE, message.StatusCode.String())
	header.Set(HEADER_SERVICE_NAME, message.ServiceName)
	header.Set(HEADER_METHOD_NAME, message.MethodName)
	header.Set(HEADER_ERROR, message.Error)
	metaDataJson, _ := json.Marshal(message.MetaData)
	header.Set(HEADER_META_DATA, string(metaDataJson))

	_, _ = rw.Write(message.Data)
}
Copy the code

Finally we just need to start the HTTP Server in the Wrapper.

func (w *DefaultServerWrapper) WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc {
	return func(network string, addr string, meta map[string]interface{}) error {
		// omit the preceding part./ / start the gateway
		s.startGateway()
		return serveFunc(network, addr, meta)
	}
}
Copy the code

Client test code:

func MakeHttpCall(a) {
    // Declare the parameters and serialize them in the body of the HTTP request
	arg := service.Args{A: rand.Intn(200), B: rand.Intn(100)}
	data, _ := msgpack.Marshal(arg)
	body := bytes.NewBuffer(data)
	req, err := http.NewRequest("POST"."http://localhost:5080/invoke", body)
	iferr ! =nil {
		log.Println(err)
		return
	}
	req.Header.Set(server.HEADER_SEQ, "1")
	req.Header.Set(server.HEADER_MESSAGE_TYPE, protocol.MessageTypeRequest.String())
	req.Header.Set(server.HEADER_COMPRESS_TYPE,protocol.CompressTypeNone.String())
	req.Header.Set(server.HEADER_SERIALIZE_TYPE,codec.MessagePack.String())
	req.Header.Set(server.HEADER_STATUS_CODE,protocol.StatusOK.String())
	req.Header.Set(server.HEADER_SERVICE_NAME,"Arith")
	req.Header.Set(server.HEADER_METHOD_NAME,"Add")
	req.Header.Set(server.HEADER_ERROR,"")
	meta := map[string]interface{} {"key":"value"}
	metaJson, _ := json.Marshal(meta)
	req.Header.Set(server.HEADER_META_DATA,string(metaJson))
	response, err := http.DefaultClient.Do(req)
	iferr ! =nil {
		log.Println(err)
		return
	}
	ifresponse.StatusCode ! =200 {
		log.Println(response)
	} else ifresponse.Header.Get(server.HEADER_ERROR) ! ="" {
		log.Println(response.Header.Get(server.HEADER_ERROR))
	} else {
		data, err = ioutil.ReadAll(response.Body)
		result := service.Reply{}
		msgpack.Unmarshal(data, &result)
		fmt.Println(result.C)
	}
}
Copy the code

conclusion

This is the end of the series, but there is still a lot to improve, enrich and even error, to be updated as a separate article.

The historical link

Implementing an RPC framework from scratch (zero)

Implementing an RPC Framework from Scratch (PART 1)

Implementing an RPC Framework from Scratch (PART 2)

Implementing an RPC Framework from Scratch (part 3)

Implementing an RPC Framework from Scratch (4)