In the next two articles, we will learn about Server and Client in GO-Micro. This article focuses on Server.

The Server in Go-Micro is also separated from an interface type for customization. The default Server in Go-Micro is RPC_server.

Let’s start by looking at which method signatures the Server interface mainly contains

type Server interface { // Initialise options Init(... Option) error // Retrieve the options Options() Options // Register a handler Handle(Handler) error // Create a new handler NewHandler(interface{}, ... HandlerOption) Handler // Create a new subscriber NewSubscriber(string, interface{}, ... SubscriberOption) Subscriber // Register a subscriber Subscribe(Subscriber) error // Start the server Start() error // Stop the server Stop() error // Server implementation String() string }Copy the code

During initialization, newRpcServer is actually called.

// Init initialises the default server with options passed in func Init(opt ... Option) { if DefaultServer == nil { DefaultServer = newRpcServer(opt...) } DefaultServer.Init(opt...) }Copy the code

When started, it also starts the Start method of newRpcServer

// Start starts the default server
func Start() error {
	config := DefaultServer.Options()
	if logger.V(logger.InfoLevel, logger.DefaultLogger) {
		logger.Infof("Starting server %s id %s", config.Name, config.Id)
	}
	return DefaultServer.Start()
}

Copy the code

So let’s take a look at the default Server which has the following structure

Type rpcServer struct {router *router // route exit chan chan error sync.RWMutex opts Options map[string]Handler subscribers map[Subscriber][]broker.Subscriber // marks the serve as started started bool // used for first registration  registered bool // subscribe to service name subscriber broker.Subscriber // graceful exit wg *sync.WaitGroup rsvc *registry.Service }Copy the code

To instantiate a newRpcServer instance,

func newRpcServer(opts ... Option) Server { options := newOptions(opts...) router := newRpcRouter() router.hdlrWrappers = options.HdlrWrappers // handler wrappers router.subWrappers = options.SubWrappers // subscriber wrappers return &rpcServer{ opts: options, router: router, handlers: make(map[string]Handler), subscribers: make(map[Subscriber][]broker.Subscriber), exit: make(chan chan error), wg: wait(options.Context), } }Copy the code

In the Start method, there are several steps, but since there is a lot of debugging code, we will only look at the relevant code

1. Enable a listener on transport and assign the listener Address to server.options.address

ts, err := config.Transport.Listen(config.Address) if err ! = nil { return err }Copy the code

2. Connect the broker.

if err := config.Broker.Connect(); err ! = nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Broker [%s] connect error: %v", bname, err) } return err }Copy the code

3. Perform pre-registration check and Register only when no problem exists. So this pre-registration check is kind of interesting, it’s a function that’s called before registration, but the default check function doesn’t really do anything, it just returns nil, so if you want to do some checking, you can define it in the opTS property of the Server, Function of the fn func(context.context) error type.

if err = s.opts.RegisterCheck(s.opts.Context); err ! = nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) } } else { // announce self to the world if err = s.Register(); err ! = nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) } } }Copy the code

The logic of the Register method is to put subscribers and Handlers on our Server in EndPoints. The logic is to convert handlers to EndPointers and store them in the EndPointer list. And in the Register method, use it to instantiate a service

service := &registry.Service{
		Name:      config.Name,
		Version:   config.Version,
		Nodes:     []*registry.Node{node},
		Endpoints: endpoints,
	}

Copy the code

The service is then registered, and the registration method registers the service into a registry for the client service to discover and make business calls

// register the service if err := regFunc(service); err ! = nil { return err }Copy the code

This is the whole registration process, said the Start method of registration, we continue to look at the Start method of the content

go func() { for { // listen for connections err := ts.Accept(s.ServeConn) // TODO: listen for messages // msg := broker.Exchange(service).Consume() select { // check if we're supposed to exit case <-exit: return // check the error and backoff default: if err ! = nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Accept error: %v", err) } time.Sleep(time.Second) continue } } // no error just exit return } }()Copy the code

Err is null in default and continues if err is not null. This is related to ts.accept (s.serveconn).

func (t *grpcTransportListener) Accept(fn func(transport.Socket)) error { var opts []grpc.ServerOption // setup tls if specified if t.secure || t.tls ! = nil { config := t.tls if config == nil { var err error addr := t.listener.Addr().String() config, err = getTLSConfig(addr) if err ! = nil { return err } } creds := credentials.NewTLS(config) opts = append(opts, grpc.Creds(creds)) } // new service srv := grpc.NewServer(opts...) // register service pb.RegisterTransportServer(srv, &microTransport{addr: t.listener.Addr().String(), fn: fn}) // start serving return srv.Serve(t.listener) }Copy the code

In the Accept method, a service is new, and the SRV is a Server instantiated in the google.golang.org/grpc library. This Server represents the gRpc Server used to process RPC requests.

The gRpc Server is started by the Serve method, which receives a Connection on a parameter of type net.Listener, creates an instance of ServerTransport and a service thread serving that connection. The server thread reads the gRpc request and responds with a call to the handler registered with it. The Serve method will return when a fatal error occurs with the Accept method on net.listener. The net.Listener closes when the Serve method returns.

The Serve method returns a non-empty error unless Stop is called or GracefulStop is called.

This is why in default, where err := ts.accept (s.serveconn) is selected after err := ts.accept (s.serveconn), err is continuously checked to see if err is empty. If not, continue to Accept.

There is also a thread in the Start method that detects the registration check and re-registers if there is a problem with the registration check. Until the Server exits. When an exit is detected, the Deregister method is executed, unregistered, waits for Requests to complete, then closes the transport listener, closes the connection on Borker, and that’s all the Start method does.

Let’s take a look at the actual service process through an example

package main import ( "log" "github.com/asim/go-micro/examples/v3/server/handler" "github.com/asim/go-micro/examples/v3/server/subscriber" "github.com/asim/go-micro/v3/cmd" "github.com/asim/go-micro/v3/server") func main() {// optionally setup command line usage cmd.init () // initialize Server server.Init( server.Name("go.micro.srv.example"), Handlers server.handle (server.newhandler (new(handler.example),), Subscribers if err := server.subscribe (server.newsubscriber (" top.example ", new(subscriber.example),); err ! = nil { log.Fatal(err) } if err := server.Subscribe( server.NewSubscriber( "topic.example", subscriber.Handler, ), ); err ! = nil {log.fatal (err)} // Run server if err := server.run (); err ! = nil { log.Fatal(err) } }Copy the code

Now, how do you define Handler, but no matter how you define Handler, when you register it, it’s going to convert it for us

server.NewHandler(
			new(handler.Example),
		),
Copy the code

And then the

func Handle(h Handler) error {
	return DefaultServer.Handle(h)
}
Copy the code

Method helps us register with the Server.

Look at the Handler we implemented

package handler import ( "log" example "github.com/asim/go-micro/examples/v3/server/proto/example" "github.com/asim/go-micro/v3/metadata" "github.com/asim/go-micro/v3/server" "context" ) type Example struct{} func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { md, _ := metadata.FromContext(ctx) log.Printf("Received Example.Call request with metadata: %v", md) rsp.Msg = server.DefaultOptions().Id + ": Hello " + req.Name return nil } func (e *Example) Stream(ctx context.Context, stream server.Stream) error { log.Print("Executing streaming handler") req := &example.StreamingRequest{} // We just want to receive 1 request and then process here if err := stream.Recv(req); err ! = nil { log.Printf("Error receiving streaming request: %v", err) return err } log.Printf("Received Example.Stream request with count: %d", req.Count) for i := 0; i < int(req.Count); i++ { log.Printf("Responding: %d", i) if err := stream.Send(&example.StreamingResponse{ Count: int64(i), }); err ! = nil { return err } } return nil } func (e *Example) PingPong(ctx context.Context, stream server.Stream) error { for { req := &example.Ping{} if err := stream.Recv(req); err ! = nil { return err } log.Printf("Got ping %v", req.Stroke) if err := stream.Send(&example.Pong{Stroke: req.Stroke}); err ! = nil { return err } } }Copy the code

The proto file contains the following contents

syntax = "proto3";

package go.micro.srv.example;

service Example {
	rpc Call(Request) returns (Response) {}
	rpc Stream(StreamingRequest) returns (stream StreamingResponse) {}
	rpc PingPong(stream Ping) returns (stream Pong) {}
}

message Message {
	string say = 1;
}

message Request {
	string name = 1;
}

message Response {
	string msg = 1;
}

message StreamingRequest {
	int64 count = 1;
}

message StreamingResponse {
	int64 count = 1;
}

message Ping {
	int64 stroke = 1;
}

message Pong {
	int64 stroke = 1;
}

Copy the code