In microservices, where any service can be deployed with multiple instances, a policy is required to select the processing node for which instance a request is sent for processing.

The Selector is based on the Registry for node selection and state marking. Different algorithms can be used in the selection process.

The Selector interface is defined as

// Selector builds on the registry as a mechanism to pick nodes // and mark their status. This allows host pools and other things // to be built using various algorithms. type Selector interface { Init(opts ... Option) error Options() Options // Select returns a function, which should return the next node Select(service string, opts... SelectOption) (Next, error) // Mark sets the success/error against a node Mark(service string, node *registry.Node, err error) // Reset returns state back to zero for a service Reset(service string) // Close renders the selector unusable Close() error // Name of the selector String() string }Copy the code

If different selection policies are required, you can customize the selection policies

// Next is a function that returns the next node
// based on the selector's strategy
type Next func() (*registry.Node, error)

// Filter is used to filter a service during the selection process
type Filter func([]*registry.Service) []*registry.Service

// Strategy is a selection strategy e.g random, round robin
type Strategy func([]*registry.Service) Next

Copy the code

The default implementation of selector

func NewSelector(opts ... Option) Selector { sopts := Options{ Strategy: Random, } for _, opt := range opts { opt(&sopts) } if sopts.Registry == nil { sopts.Registry = registry.DefaultRegistry } s := &registrySelector{ so: sopts, } s.rc = s.newCache() return s }Copy the code

In the default implementation, the node selection strategy is random

func init() {
	rand.Seed(time.Now().UnixNano())
}

// Random is a random strategy algorithm for node selection
func Random(services []*registry.Service) Next {
	nodes := make([]*registry.Node, 0, len(services))

	for _, service := range services {
		nodes = append(nodes, service.Nodes...)
	}

	return func() (*registry.Node, error) {
		if len(nodes) == 0 {
			return nil, ErrNoneAvailable
		}

		i := rand.Int() % len(nodes)
		return nodes[i], nil
	}
}

Copy the code

The node selection process is as follows:

  • Gets a list of services based on the specified service name
  • Filter according to the filter in the service options
  • If more than one node still exists, a node is selected to return according to the selection policy
func (c *registrySelector) Select(service string, opts ... SelectOption) (Next, error) { sopts := SelectOptions{ Strategy: c.so.Strategy, } for _, opt := range opts { opt(&sopts) } // get the service // try the cache first // if that fails go directly to the registry  services, err := c.rc.GetService(service) if err ! = nil { if err == registry.ErrNotFound { return nil, ErrNotFound } return nil, err } // apply the filters for _, filter := range sopts.Filters { services = filter(services) } // if there's nothing left, return if len(services) == 0 { return nil, ErrNoneAvailable } return sopts.Strategy(services), nil }Copy the code

From the client load example shown in the examples, we can see how to use it

func main() {
	cmd.Init()

	client.DefaultClient = client.NewClient(
		client.Selector(FirstNodeSelector()),
	)

	fmt.Println("\n--- Call example ---")
	for i := 0; i < 10; i++ {
		call(i)
	}
}
Copy the code

The return value of FirstNodeSelector is a FirstNodeSelector payload, which is the implementer of the Selector interface, and the logic is very simple, and the effect is always to select the first service in the list

// Built in random hashed node selector type firstNodeSelector struct { opts selector.Options } func (n *firstNodeSelector) Init(opts ... selector.Option) error { for _, o := range opts { o(&n.opts) } return nil } func (n *firstNodeSelector) Options() selector.Options { return n.opts } func (n *firstNodeSelector) Select(service string, opts ... selector.SelectOption) (selector.Next, error) { services, err := n.opts.Registry.GetService(service) if err ! = nil { return nil, err } if len(services) == 0 { return nil, selector.ErrNotFound } var sopts selector.SelectOptions for _, opt := range opts { opt(&sopts) } for _, filter := range sopts.Filters { services = filter(services) } if len(services) == 0 { return nil, selector.ErrNotFound } if len(services[0].Nodes) == 0 { return nil, selector.ErrNotFound } return func() (*registry.Node, error) { return services[0].Nodes[0], nil }, nil } func (n *firstNodeSelector) Mark(service string, node *registry.Node, err error) { return } func (n *firstNodeSelector) Reset(service string) { return } func (n *firstNodeSelector) Close() error { return nil } func (n *firstNodeSelector) String() string { return "first" }Copy the code

So if we look at the client code, when we instantiate the client, we’re initializing our Selector, and this shows us that as long as we’re implementing the Selector interface, it’s okay to define our own Selector implementation.

client.DefaultClient = client.NewClient(
        client.Selector(FirstNodeSelector()),
)
Copy the code

When the service is invoked, you can see how the process works from the following implementation of the client.call method

func call(i int) { // Create new request to service go.micro.srv.example, method Example.Call req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ Name: "John", }) rsp := &example.Response{} // Call service if err := client.Call(context.Background(), req, rsp); err ! = nil { fmt.Println("call err: ", err, rsp) return } fmt.Println("Call:", i, "rsp:", rsp.Msg) }Copy the code

If you use a GRPC,

func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ... client.CallOption) error { if req == nil { return errors.InternalServerError("go.micro.client", "req is nil") } else if rsp == nil { return errors.InternalServerError("go.micro.client", "rsp is nil") } // make a copy of call opts callOpts := g.opts.CallOptions for _, opt := range opts { opt(&callOpts) } next, err := g.next(req, callOpts) if err ! = nil { return err } // check if we already have a deadline d, ok := ctx.Deadline() if ! ok { // no deadline so we create a new one var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) defer cancel() } else { // got a deadline so no need to setup context // but we need to set the  timeout we pass along opt := client.WithRequestTimeout(time.Until(d)) opt(&callOpts) } // should we noop right here? select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) default: } // make copy of call method gcall := g.call // wrap the call in reverse for i := len(callOpts.CallWrappers); i > 0; i-- { gcall = callOpts.CallWrappers[i-1](gcall) } // return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err ! = nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // select next node node, err := next() service := req.Service() if err ! = nil { if err == selector.ErrNotFound { return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) } return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } // make the call err = gcall(ctx, node, req, rsp, callOpts) g.opts.Selector.Mark(service, node, err) if verr, ok := err.(*errors.Error); ok { return verr } return err } ch := make(chan error, callOpts.Retries+1) var gerr error for i := 0; i <= callOpts.Retries; i++ { go func(i int) { ch <- call(i) }(i) select { case <-ctx.Done(): return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) case err := <-ch: // if the call succeeded lets bail early if err == nil { return nil } retry, rerr := callOpts.Retry(ctx, req, i, err) if rerr ! = nil { return rerr } if ! retry { return err } gerr = err } } return gerr }Copy the code

He calls the next method to get the node

func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) { service, address, _ := pnet.Proxy(request.Service(), opts.Address) // return remote address if len(address) > 0 { return func() (*registry.Node, error) { return &registry.Node{ Address: address[0], }, nil }, nil } // get next nodes from the selector next, err := g.opts.Selector.Select(service, opts.SelectOptions...) if err ! = nil { if err == selector.ErrNotFound { return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) } return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } return next, nil }Copy the code

Node selection is made before the actual call

// return errors.New("go.micro.client", "request timeout", 408) call := func(i int) error { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, req, i) if err ! = nil { return errors.InternalServerError("go.micro.client", err.Error()) } // only sleep if greater than 0 if t.Seconds() > 0 { time.Sleep(t) } // select next node node, err := next() service := req.Service() if err ! = nil { if err == selector.ErrNotFound { return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) } return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error()) } // make the call err = gcall(ctx, node, req, rsp, callOpts) g.opts.Selector.Mark(service, node, err) if verr, ok := err.(*errors.Error); ok { return verr } return err }Copy the code

That’s all there is to know about node selectors. If you want to know more about node selectors, you can go to Go-Micro to see the code in more detail.