introduce

GRPC implements service discovery and registration through ETCD in Jupiter -0.2.7.

Implementation resolution of service discovery and registration

The service registry

Flowchart for service registration:

Etcd service registration code modules in the Jupiter/PKG/registry/etcdv3.

Let’s take a look at the actual code

// Registry register/unregister service
// registry impl should control rpc timeout
type Registry interface {
	RegisterService(context.Context, *server.ServiceInfo) error
	UnregisterService(context.Context, *server.ServiceInfo) error
	ListServices(context.Context, string.string) ([]*server.ServiceInfo, error)
	WatchServices(context.Context, string.string) (chan Endpoints, error)
	io.Closer
}
Copy the code

An interface to register service objects is defined in PKG/Registry /registry.go. Jupiter can use different services that implement these interfaces.

First let’s look at the registration method

// RegisterService register service to registry
func (reg *etcdv3Registry) RegisterService(ctx context.Context, info *server.ServiceInfo) error {
	err := reg.registerBiz(ctx, info)
	...
}

// Register business information
func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.ServiceInfo) error{...// Submit information to etCD_, err := reg.client.Put(readCtx, key, val, opOptions...) . }Copy the code

The main part here is reg.client.put () to submit the service information to the ETCD. I’ll write a separate article about the lease mechanism. The main focus here is on how to register. There is also a registerMetric() method for submitting service information to the Prometheus prefix directory of etCD for service monitoring using the client.put () method. I’m not going to show you the code, but if you’re interested, you can go to the source code library.

Service exit

// Delete the service
func (reg *etcdv3Registry) unregister(ctx context.Context, key string) error{...// Delete service information
	_, err := reg.client.Delete(ctx, key)
	...
}
Copy the code

Here the service information is removed from the ETCD using the client.delete () method.

Getting a list of services

// ListServices list service registered in registry with name `name`
func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) {
	// Prefix of service information key
	target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme)
	// Get all the information about the related prefix
	getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix())
	...
}
Copy the code

The service information with the same prefix is obtained using the client.get () method.

Service information change monitoring

// WatchServices watch service change event, then return address list
func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) {
	prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name)
	// Create a monitor channel through the etCD client
	watch, err := reg.client.WatchPrefix(context.Background(), prefix)
	iferr ! =nil {
		return nil, err
	}
	...
	xgo.Go(func(a) {
		// Continue to receive change events from etCD
		for event := range watch.C() {
			switch event.Type {
			case mvccpb.PUT:
				updateAddrList(al, prefix, scheme, event.Kv)
			case mvccpb.DELETE:
				deleteAddrList(al, prefix, scheme, event.Kv)
			}

			out := al.DeepCopy()
			fmt.Printf("al => %p\n", al.Nodes)
			fmt.Printf("snapshot => %p\n", out.Nodes)
			select {
			// Send the updated service information to the resolver
			case addresses <- *out:
			default:
				xlog.Warnf("invalid")}}})// Return an address channel for passing
	return addresses, nil
}
Copy the code

The WatchServices() method monitors information changes and returns the changed service information back to the Resolver. The etcdClient.watch () method is used to create a monitoring channel, and then put a Goroutine to continuously receive events pushed by etCD to maintain local service information. Finally, the resolver returns to the GRPCLB load balancer to update the service address information.

Service discovery

Service discovery Flowchart:

The RESOLver module of GRPC defines two interfaces

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	Scheme() string
}
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
	ResolveNow(ResolveNowOptions)
	Close()
}
Copy the code

First of all, let’s look at the concrete implementation of the Builder interface

type baseBuilder struct {
	name string
	reg  registry.Registry
}

// Build ...
func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc")
	iferr ! =nil {
		return nil, err
	}

	var stop = make(chan struct{})
	xgo.Go(func(a) {
		for {
			select {
			case endpoint := <-endpoints:
				var state = resolver.State{
					Addresses: make([]resolver.Address, 0),... }for _, node := range endpoint.Nodes {
					...
					state.Addresses = append(state.Addresses, address)
				}
				cc.UpdateState(state)
			case <-stop:
				return}}})return &baseResolver{
		stop: stop,
	}, nil
}
Copy the code

Here, the Build method mainly obtains the monitoring service channel through the Registry module, and then updates the updated service information to grpcClient to ensure that the service address of grpcClient’s load balancer is always up to date.

How do I register a concrete implementation of Builder with GRPC

import "google.golang.org/grpc/resolver"

// Register ...
func Register(name string, reg registry.Registry) {
	resolver.Register(&baseBuilder{
		name: name,
		reg:  reg,
	})
}
Copy the code

Inject the Registry module into the Builder object and then into the RESOLver module of the GRPC. GrpcClient will then call etCD’s service discovery function in real time.

How does GRPC use source code parsing for services and discovery

Here’s a look at how the Jupiter framework uses service discovery and registration in a real-world project.

The service registry


func (app *Application) startServers(a) error {
	var eg errgroup.Group
	// start multi servers
	for _, s := range app.servers {
		s := s
		eg.Go(func(a) (err error) {
			_ = app.registerer.RegisterService(context.TODO(), s.Info())
			deferapp.registerer.UnregisterService(context.TODO(), s.Info()) ... })}return eg.Wait()
}

eng := engine.NewEngine()
eng.SetRegistry(compound_registry.New(
	etcdv3_registry.StdConfig("default").Build(),
))	
Copy the code

Automatic registration and deletion of services have been implemented in the Application module of the framework. This is generally not required when using frames. For project use, you only need to inject the registry information when creating the Application object.

Service discovery

// Service discovery needs initialization, get the service information in etCD
func (eng *Engine) initResolver(a) error {
	resolver.Register("etcd", etcdv3.StdConfig("default").Build())
	return nil
}
Copy the code

Service discovery is also of type by injecting registry information.

Article series

  • Jupiter framework entry introduction
  • GRPC through ETCD service discovery and registration – source code analysis
  • Jupiter -etcd client introduction