High availability and high performance communication services are usually implemented by service registration and discovery, load balancing and fault tolerance. Based on the location of load balancing, there are three solutions:
1. Centralized LB (Proxy Model)
There is a separate LB between the service consumer and the service provider, usually a specialized hardware device such as F5, or a software-based implementation such as LVS, HAproxy, etc. The LB has an address mapping table of all services and is usually registered by o&M configuration. When a service consumer invokes a target service, it sends a request to the LB, and the LB forwards the request to the target service based on a load balancing policy such as round-robin. The LB has the health check capability to automatically remove unhealthy service instances. Main problems of the program:
1. Single-point problem: all service invocation traffic passes through LB. When the number of services and call usage is large, LB is prone to become a bottleneck, and once LB fails, the whole system is affected. 2. A level is added between service consumers and providers, resulting in performance overhead.
2. In-process LB (balance-Aware Client)
To address the shortcomings of the first solution, this solution integrates LB functionality into the service consumer process, also known as soft load or client load solution. Service providers started, first of all, the service address registered with the service registry, and regularly to the heart to service registry to indicate that the survival of the state, the equivalent of health examination, the service consumer to access a service, it is through the built-in LB component to the service registry query, at the same time the cache and refresh the target service address list on a regular basis, Then select a target service address based on some load balancing policy, and finally send a request to the target service. LB and service discovery capabilities are dispersed within the process of each service consumer, while direct calls are made between the service consumer and service provider, with no additional overhead and better performance. Main problems of the program:
1. Development cost. The solution integrates the service caller into the process of the client. 2. In addition, in the production environment, if the client database needs to be upgraded in the future, the service caller will be required to modify the code and re-publish it, so the upgrade will be complicated.
External Load Balancing Service
This scheme is a compromise scheme for the shortcomings of the second scheme, and the principle is basically similar to the second scheme. The difference is that the LB and service discovery functions are moved out of the process and become a separate process on the host. When one or more services on a host want to access a target service, they use an independent LB process on the same host for service discovery and load balancing. The plan without a single point problem is also a kind of distributed scheme, a LB process only affects the service on the host caller, service between the caller and LB is the process performance is good, call to service at the same time it also simplifies the caller, don’t need a library for customers of different language development, upgrading of LB don’t need to change service invocation code. The main problems of the scheme are complicated deployment, many links, and inconvenient debugging and troubleshooting.
GRPC service discovery and load balancing Implementation The gRPC open-source component does not directly provide service registration and discovery functions, but its design documents have provided the implementation ideas, and the gRPC code apis in different languages have provided named resolution and load balancing interfaces for extension.
Its basic realization principle:
1. After the service is started, the gRPC client sends a name resolution request to the naming server. The name is resolved into one or more IP addresses. 2. The client instantiates a load balancing policy. If the IP address returned from the resolution is the IP address of the load balancer, the client uses the GRPCLB policy. 3. A load balancing policy creates a channel for each server address. 4. When there is an RPC request, the load balancing policy determines which sub-channel (GRPC server) will receive the request, and the client request will be blocked when the available server is empty. According to the official design ideas provided by gRPC, a feasible solution for gRPC service discovery and load balancing can be found based on the in-process LB solution (i.e. the second case, alibaba’s open source service framework Dubbo also adopts a similar mechanism) combined with distributed and consistent components (such as Zookeeper, Consul, Etcd). Next, take GO language as an example to briefly introduce the key code implementation based on Etcd3:
1) Name resolution implementation: resolver.go
package etcdv3 import ( "errors" "fmt" "strings" etcd3 "github.com/coreos/etcd/clientv3" "google.golang.org/grpc/naming" ) // resolver is the implementaion of grpc.naming.Resolver type resolver struct { serviceName string // service name to resolve } // NewResolver return resolver with service name func NewResolver(serviceName string) *resolver { return &resolver{serviceName: serviceName} } // Resolve to resolve the service from etcd, target is the dial address of etcd // target example: "Http://127.0.0.1:2379, http://127.0.0.1:12379, http://127.0.0.1:22379," func (re * resolver) Resolve (target string) (naming.Watcher, error) { if re.serviceName == "" { return nil, errors.New("grpclb: no service name provided") } // generate etcd client client, err := etcd3.New(etcd3.Config{ Endpoints: strings.Split(target, ","), }) if err ! = nil { return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error()) } // Return watcher return &watcher{re: re, client: *client}, nil }Copy the code
2) Service discovery implementation: Watcher.go
package etcdv3 import ( "fmt" etcd3 "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" "google.golang.org/grpc/naming" "github.com/coreos/etcd/mvcc/mvccpb" ) // watcher is the implementaion of grpc.naming.Watcher type watcher struct { re *resolver // re: Etcd Resolver client etcd3.Client isInitialized bool } // Close do nothing func (w *watcher) Close() { } // Next to return the updates func (w *watcher) Next() ([]*naming.Update, error) { // prefix is the etcd prefix/value to watch prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName) // check if is initialized if ! w.isInitialized { // query addresses from etcd resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix()) w.isInitialized = true if err == nil { addrs := extractAddrs(resp) //if not empty, return the updates or watcher new dir if l := len(addrs); l ! = 0 { updates := make([]*naming.Update, l) for i := range addrs { updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]} } return updates, nil } } } // generate etcd Watcher rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil case mvccpb.DELETE: return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil } } } return nil, nil } func extractAddrs(resp *etcd3.GetResponse) []string { addrs := []string{} if resp == nil || resp.Kvs == nil { return addrs } for i := range resp.Kvs { if v := resp.Kvs[i].Value; v ! = nil { addrs = append(addrs, string(v)) } } return addrs }Copy the code
3) Service registration implementation: register.go
package etcdv3 import ( "fmt" "log" "strings" "time" etcd3 "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" ) // Prefix should start and end with no slash var Prefix = "etcd3_naming" var client etcd3.Client var serviceKey string var stopSignal = make(chan bool, 1) // Register func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error { serviceValue := fmt.Sprintf("%s:%d", host, port) serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue) // get endpoints for register dial address var err error client, err := etcd3.New(etcd3.Config{ Endpoints: strings.Split(target, ","), }) if err ! = nil { return fmt.Errorf("grpclb: create etcd3 client failed: %v", err) } go func() { // invoke self-register with ticker ticker := time.NewTicker(interval) for { // minimum lease TTL is ttl-second resp, _ := client.Grant(context.TODO(), int64(ttl)) // should get first, if not exist, set it _, err := client.Get(context.Background(), serviceKey) if err ! = nil { if err == rpctypes.ErrKeyNotFound { if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err ! = nil { log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error()) } } else { log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error()) } } else { // refresh set to true for not notifying the watcher if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err ! = nil { log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error()) } } select { case <-stopSignal: return case <-ticker.C: } } }() return nil } // UnRegister delete registered service from etcd func UnRegister() error { stopSignal <- true stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock var err error; if _, err := client.Delete(context.Background(), serviceKey); err ! = nil { log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error()) } else { log.Printf("grpclb: deregister '%s' ok.", serviceKey) } return err }Copy the code
4) Interface description file: HelloWorld.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.midea.jr.test.grpc";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {
}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
Copy the code
5) Implement the server interface: HelloWorldServer.go
package main import ( "flag" "fmt" "log" "net" "os" "os/signal" "syscall" "time" "golang.org/x/net/context" "google.golang.org/grpc" grpclb "com.midea/jr/grpclb/naming/etcd/v3" "com.midea/jr/grpclb/example/pb" ) var ( serv = flag.String("service", "hello_service", "service name") port = flag.Int("port", 50001, "Listening Port ") reg = flag.String("reg", "http://127.0.0.1:2379", "Register etcd address")) func main() {flag.parse () lis, err := net.listen (" TCP ", ftt.sprintf ("0.0.0.0:%d", *port)) if err ! = nil {panic(err)} err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15) if err! = nil { panic(err) } ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { s := <-ch log.Printf("receive signal '%v'", s) grpclb.UnRegister() os.Exit(1) }() log.Printf("starting hello service at %d", *port) s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) s.Serve(lis) } // server is used to implement helloworld.GreeterServer. type server struct{} // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name) return &pb.HelloReply{Message: "Hello " + in.Name}, nil }Copy the code
6) Implement the client interface: HelloWorldClient.go
package main import ( "flag" "fmt" "time" grpclb "com.midea/jr/grpclb/naming/etcd/v3" "com.midea/jr/grpclb/example/pb" "golang.org/x/net/context" "google.golang.org/grpc" "strconv" ) var ( serv = flag.String("service", "hello_service", "Service name") reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address") ) func main() { flag.Parse() r := grpclb.NewResolver(*serv) b := grpc.RoundRobin(r) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b)) if err ! = nil { panic(err) } ticker := time.NewTicker(1 * time.Second) for t := range ticker.C { client := pb.NewGreeterClient(conn) resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())}) if err == nil { fmt.Printf("%v: Reply is %s\n", t, resp.Message) } } }Copy the code
7) Run tests
Run three servers S1, S2, S3, and one client C to check whether the number of requests received by each server is equal.
Shut down 1 server S1 and see if requests are diverted to the other 2 servers.
Restart S1 server to see if the other two server requests are evenly distributed to S1?
Shut down the Etcd3 server and check whether the communication between the client and server is normal. Closed communication is still normal, but the new server will not register, and the server will not be able to remove the lost connection. After the Etcd3 server is restarted, the server automatically recovers. Shut down all servers and client requests will be blocked.
This article is from the partner of the cloud community “Golang Language Community”. For relevant information, you can pay attention to “Golang Language Community”.