introduce
GRPC implements service discovery and registration through ETCD in Jupiter -0.2.7.
Implementation resolution of service discovery and registration
The service registry
Flowchart for service registration:
Etcd service registration code modules in the Jupiter/PKG/registry/etcdv3.
Let’s take a look at the actual code
// Registry register/unregister service
// registry impl should control rpc timeout
type Registry interface {
RegisterService(context.Context, *server.ServiceInfo) error
UnregisterService(context.Context, *server.ServiceInfo) error
ListServices(context.Context, string.string) ([]*server.ServiceInfo, error)
WatchServices(context.Context, string.string) (chan Endpoints, error)
io.Closer
}
Copy the code
An interface to register service objects is defined in PKG/Registry /registry.go. Jupiter can use different services that implement these interfaces.
First let’s look at the registration method
// RegisterService register service to registry
func (reg *etcdv3Registry) RegisterService(ctx context.Context, info *server.ServiceInfo) error {
err := reg.registerBiz(ctx, info)
...
}
// Register business information
func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.ServiceInfo) error{...// Submit information to etCD_, err := reg.client.Put(readCtx, key, val, opOptions...) . }Copy the code
The main part here is reg.client.put () to submit the service information to the ETCD. I’ll write a separate article about the lease mechanism. The main focus here is on how to register. There is also a registerMetric() method for submitting service information to the Prometheus prefix directory of etCD for service monitoring using the client.put () method. I’m not going to show you the code, but if you’re interested, you can go to the source code library.
Service exit
// Delete the service
func (reg *etcdv3Registry) unregister(ctx context.Context, key string) error{...// Delete service information
_, err := reg.client.Delete(ctx, key)
...
}
Copy the code
Here the service information is removed from the ETCD using the client.delete () method.
Getting a list of services
// ListServices list service registered in registry with name `name`
func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) {
// Prefix of service information key
target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme)
// Get all the information about the related prefix
getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix())
...
}
Copy the code
The service information with the same prefix is obtained using the client.get () method.
Service information change monitoring
// WatchServices watch service change event, then return address list
func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) {
prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name)
// Create a monitor channel through the etCD client
watch, err := reg.client.WatchPrefix(context.Background(), prefix)
iferr ! =nil {
return nil, err
}
...
xgo.Go(func(a) {
// Continue to receive change events from etCD
for event := range watch.C() {
switch event.Type {
case mvccpb.PUT:
updateAddrList(al, prefix, scheme, event.Kv)
case mvccpb.DELETE:
deleteAddrList(al, prefix, scheme, event.Kv)
}
out := al.DeepCopy()
fmt.Printf("al => %p\n", al.Nodes)
fmt.Printf("snapshot => %p\n", out.Nodes)
select {
// Send the updated service information to the resolver
case addresses <- *out:
default:
xlog.Warnf("invalid")}}})// Return an address channel for passing
return addresses, nil
}
Copy the code
The WatchServices() method monitors information changes and returns the changed service information back to the Resolver. The etcdClient.watch () method is used to create a monitoring channel, and then put a Goroutine to continuously receive events pushed by etCD to maintain local service information. Finally, the resolver returns to the GRPCLB load balancer to update the service address information.
Service discovery
Service discovery Flowchart:
The RESOLver module of GRPC defines two interfaces
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
ResolveNow(ResolveNowOptions)
Close()
}
Copy the code
First of all, let’s look at the concrete implementation of the Builder interface
type baseBuilder struct {
name string
reg registry.Registry
}
// Build ...
func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc")
iferr ! =nil {
return nil, err
}
var stop = make(chan struct{})
xgo.Go(func(a) {
for {
select {
case endpoint := <-endpoints:
var state = resolver.State{
Addresses: make([]resolver.Address, 0),... }for _, node := range endpoint.Nodes {
...
state.Addresses = append(state.Addresses, address)
}
cc.UpdateState(state)
case <-stop:
return}}})return &baseResolver{
stop: stop,
}, nil
}
Copy the code
Here, the Build method mainly obtains the monitoring service channel through the Registry module, and then updates the updated service information to grpcClient to ensure that the service address of grpcClient’s load balancer is always up to date.
How do I register a concrete implementation of Builder with GRPC
import "google.golang.org/grpc/resolver"
// Register ...
func Register(name string, reg registry.Registry) {
resolver.Register(&baseBuilder{
name: name,
reg: reg,
})
}
Copy the code
Inject the Registry module into the Builder object and then into the RESOLver module of the GRPC. GrpcClient will then call etCD’s service discovery function in real time.
How does GRPC use source code parsing for services and discovery
Here’s a look at how the Jupiter framework uses service discovery and registration in a real-world project.
The service registry
func (app *Application) startServers(a) error {
var eg errgroup.Group
// start multi servers
for _, s := range app.servers {
s := s
eg.Go(func(a) (err error) {
_ = app.registerer.RegisterService(context.TODO(), s.Info())
deferapp.registerer.UnregisterService(context.TODO(), s.Info()) ... })}return eg.Wait()
}
eng := engine.NewEngine()
eng.SetRegistry(compound_registry.New(
etcdv3_registry.StdConfig("default").Build(),
))
Copy the code
Automatic registration and deletion of services have been implemented in the Application module of the framework. This is generally not required when using frames. For project use, you only need to inject the registry information when creating the Application object.
Service discovery
// Service discovery needs initialization, get the service information in etCD
func (eng *Engine) initResolver(a) error {
resolver.Register("etcd", etcdv3.StdConfig("default").Build())
return nil
}
Copy the code
Service discovery is also of type by injecting registry information.
Article series
- Jupiter framework entry introduction
- GRPC through ETCD service discovery and registration – source code analysis
- Jupiter -etcd client introduction