Recently, go-Zero, a popular open source project, is a fully functional microservice framework that integrates various engineering practices and includes Web and RPC protocols. Today, we will analyze the RPC part of zRPC.

The bottom layer of zRPC relies on gRPC and has built-in modules such as service registration, load balancing and interceptor. It also includes micro-service governance schemes such as adaptive load reduction, adaptive fusing and current limiting. ZRPC is an easy-to-use enterprise-level RPC framework that can be directly used in production.

ZRPC que

ZRPC supports direct connection and etCd-based service discovery. We take ETCd-based service discovery as an example to demonstrate the basic use of zRPC:

configuration

Create the hello.yaml configuration file as follows:

Name: hello.rpc           // The service name
ListenOn: 127.0. 01.: 9090  // Service listening address
Etcd:
  Hosts:
    - 127.0. 01.: 2379      // Etcd service address
  Key: hello.rpc          // Service Registration Key
Copy the code
Create the proto file

Create a hello.proto file and generate the corresponding GO code

syntax = "proto3";

package pb;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}}message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}
Copy the code

Generate go code

protoc --go_out=plugins=grpc:. hello.proto
Copy the code
The Server side
package main

import (
    "context"
    "flag"
    "log"

    "example/zrpc/pb"

    "github.com/tal-tech/go-zero/core/conf"
    "github.com/tal-tech/go-zero/zrpc"
    "google.golang.org/grpc"
)

type Config struct {
    zrpc.RpcServerConf
}

var cfgFile = flag.String("f"."./hello.yaml"."cfg file")

func main(a) {
    flag.Parse()

    var cfg Config
    conf.MustLoad(*cfgFile, &cfg)

    srv, err := zrpc.NewServer(cfg.RpcServerConf, func(s *grpc.Server) {
        pb.RegisterGreeterServer(s, &Hello{})
    })
    iferr ! =nil {
        log.Fatal(err)
    }
    srv.Start()
}

type Hello struct{}

func (h *Hello) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    return &pb.HelloReply{Message: "hello " + in.Name}, nil
}
Copy the code
The Client side
package main

import (
    "context"
    "log"

    "example/zrpc/pb"

    "github.com/tal-tech/go-zero/core/discov"
    "github.com/tal-tech/go-zero/zrpc"
)

func main(a) {
    client := zrpc.MustNewClient(zrpc.RpcClientConf{
        Etcd: discov.EtcdConf{
            Hosts: []string{"127.0.0.1:2379"},
            Key:   "hello.rpc",
        },
    })

    conn := client.Conn()
    hello := pb.NewGreeterClient(conn)
    reply, err := hello.SayHello(context.Background(), &pb.HelloRequest{Name: "go-zero"})
    iferr ! =nil {
        log.Fatal(err)
    }
    log.Println(reply.Message)
}
Copy the code

Start the service and check whether the service is registered:

ETCDCTL_API=3 etcdctl get hello.rpc --prefix
Copy the code

Show that the service is registered:

hello.rpc/7587849401504590084
127.0. 01.:9090
Copy the code

Run the client to see the output:

hello go-zero
Copy the code

This example demonstrates the basic use of zRPC, and you can see how easy it is to build RPC services from zRPC with only a few lines of code, so let’s continue exploring

ZRPC principle analysis

The following figure shows the architecture diagram and major components of zRPC

ZRPC mainly consists of the following modules:

  • Discov: Service discovery module, realize service discovery function based on ETCD
  • Resolver: Service registration module, which implements the gRPC resolver.Builder interface and registers to gRPC
  • Interceptor: interceptor, which intercepts requests and responses
  • Balancer: load balancing module that implements THE P2C load balancing algorithm and registers with gRPC
  • Client: a zRPC client that initiates requests
  • Server: zRPC server, responsible for processing requests

Here, the main components of zRPC and the main functions of each module are introduced. Among them, the Resolver and Balancer modules realize the open interface of gRPC and the self-defined Resolver and Balancer modules, and the interceptor module is the function focus of the whole zRPC. Adaptive load drop, adaptive fuse, Prometheus service indicator collection and other functions are realized here

Interceptor module

GRPC provides interceptor function, mainly before and after the request for additional processing of interception operations, including interceptors including client interceptor and server interceptor, and divided into Unary interceptor and Stream interceptor, here we mainly explain Unary interceptor, the same with the Stream interceptor.

The client interceptor is defined as follows:

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ... CallOption) error
Copy the code

Method is the method name, req is the request parameter and reply is the response parameter, cc is the client connection object, and the invoker parameter is the handler that actually executes the RPC method and is actually invoked in the interceptor

Server-side interceptors are defined as follows:

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
Copy the code

Where req is the request parameter, INFO contains the request method properties, and handler is the wrapper around the server-side method, which is also invoked in the interceptor

ZRPC has a variety of built-in interceptors, including adaptive load drop, adaptive fuse, permission verification, Prometheus indicator collection, etc. Due to the large number of interceptors, there is not enough space for all of them to be analyzed. Here we mainly analyze two interceptors, adaptive fuse and Prometheus service monitoring indicator collection:

Built-in interceptor analysis

Adaptive breaker

When the client sends a request to the server, the client records the error returned by the server. When the error rate reaches a certain percentage, the client automatically discards a certain percentage of requests to protect downstream dependencies. In this way, the client can automatically recover. The adaptive circuit breaker in zRPC follows the overload protection policy in Google SRE. The algorithm is as follows:

Requests: total number of requests

Accepts: Normal requested quantity

K: multiple value (2 recommended by Google SRE)

The radical degree of fuses can be modified by modifying the value of K. Reducing the value of K will make the adaptive fuses more radical, while increasing the value of K will make the adaptive fuses less radical

Fuse interceptors are defined as follows:

func BreakerInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ... grpc.CallOption) error {
  // target + method name
	breakerName := path.Join(cc.Target(), method)
	return breaker.DoWithAcceptable(breakerName, func(a) error {
    // Actually execute the call
		return invoker(ctx, method, req, reply, cc, opts...)
	}, codes.Acceptable)
}
Copy the code

Accept method implements Google SRE overload protection algorithm and determines whether to fuse

func (b *googleBreaker) accept(a) error {
	 // Accepts normal requests, and total is the total number of requests
   accepts, total := b.history()
   weightedAccepts := b.k * float64(accepts)
   // Algorithm implementation
   dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
   if dropRatio <= 0 {
      return nil
   }
	 // Whether the ratio exceeds the threshold
   if b.proba.TrueOnProba(dropRatio) {
      return ErrServiceUnavailable
   }

   return nil
}
Copy the code

The doReq method first determines whether a circuit breaker is open. If the condition is met, error(Circuit breaker is open) is returned directly. If the condition is not met, the number of requests is accumulated

func (b *googleBreaker) doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error {
   iferr := b.accept(); err ! =nil {
      iffallback ! =nil {
         return fallback(err)
      } else {
         return err
      }
   }

   defer func(a) {
      if e := recover(a); e ! =nil {
         b.markFailure()
         panic(e)
      }
   }()
	
   // The RPC request is executed here
   err := req()
   // Normal requests for total and accepts add 1
   if acceptable(err) {
      b.markSuccess()
   } else {
     // If a request fails, only total is incremented by 1
      b.markFailure()
   }

   return err
}
Copy the code
Prometheus indicator collection

Service monitoring is an important means to know the current running status and changing trend of services. Monitoring depends on collecting service indicators. Collecting monitoring indicators through Prometheus is the mainstream solution in the industry, and Prometheus is also used in zRPC to collect indicators

The Prometheus interceptor is defined as follows:

The interceptor mainly collects service monitoring indicators, including RPC method time-consuming and invocation errors, using the Histogram and Counter data types from Prometheus

func UnaryPrometheusInterceptor(a) grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
		interface{}, error) {
    // Record a time before execution
		startTime := timex.Now()
		resp, err := handler(ctx, req)
    // After execution, Since is used to calculate how long it takes to execute the call
		metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
    // Error code corresponding to the method
		metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
		return resp, err
	}
}
Copy the code

Add a custom interceptor

In addition to the rich built-in interceptors, zRPC also supports adding custom interceptors

The Client adds a unary interceptor using the AddInterceptor method:

func (rc *RpcClient) AddInterceptor(interceptor grpc.UnaryClientInterceptor) {
	rc.client.AddInterceptor(interceptor)
}
Copy the code

The Server AddUnaryInterceptors method adds a unary interceptor:

func (rs *RpcServer) AddUnaryInterceptors(interceptors ... grpc.UnaryServerInterceptor) {
	rs.server.AddUnaryInterceptors(interceptors...)
}
Copy the code

Resolver module

ZRPC Service Registration Architecture Diagram:

The resolver module is customized in zRPC to register services. The bottom layer of zRPC relies on gRPC. To customize a resolver in gRPC, you need to implement the resolver.Builder interface:

type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	Scheme() string
}
Copy the code

The Build method returns Resolver, which is defined as follows:

type Resolver interface {
	ResolveNow(ResolveNowOptions)
	Close()
}
Copy the code

There are two types of resolver, direct and discov defined in zRPC. Here we mainly analyze discov based on ETCD service discovery. The custom resolver needs to provide the Register method through gRPC to Register the code as follows:

func RegisterResolver(a) {
	resolver.Register(&dirBuilder)
	resolver.Register(&disBuilder)
}
Copy the code

When we Start our zRPC Server, we call the Start method, which registers the corresponding service address as in etCD:

func (ags keepAliveServer) Start(fn RegisterFn) error {
  // Register service address
	iferr := ags.registerEtcd(); err ! =nil {
		return err
	}
	// Start the service
	return ags.Server.Start(fn)
}
Copy the code

When we start the zRPC client, we call our custom resolver Build method inside gRPC. ZRPC calls the UpdateState method that executes resolver.ClientConn in the Build method. This method registers the service address inside the gRPC client:

func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) ( resolver.Resolver, error) {
	hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
		return r == EndpointSepChar
	})
  // Service discovery
	sub, err := discov.NewSubscriber(hosts, target.Endpoint)
	iferr ! =nil {
		return nil, err
	}

	update := func(a) {
		var addrs []resolver.Address
		for _, val := range subset(sub.Values(), subsetSize) {
			addrs = append(addrs, resolver.Address{
				Addr: val,
			})
		}
    // Register the service address with gRPC
		cc.UpdateState(resolver.State{
			Addresses: addrs,
		})
	}
  / / to monitor
	sub.AddListener(update)
	update()
	// Return a custom resolver.resolver
	return &nopResolver{cc: cc}, nil
}
Copy the code

In discov, get all addresses of the specified service from ETCD by calling the load method:

func (c *cluster) load(cli EtcdClient, key string) {
	var resp *clientv3.GetResponse
	for {
		var err error
		ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
    // Get all addresses of the specified service from etcd
		resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
		cancel()
		if err == nil {
			break
		}

		logx.Error(err)
		time.Sleep(coolDownInterval)
	}

	var kvs []KV
	c.lock.Lock()
	for _, ev := range resp.Kvs {
		kvs = append(kvs, KV{
			Key: string(ev.Key),
			Val: string(ev.Value),
		})
	}
	c.lock.Unlock()

	c.handleChanges(key, kvs)
}
Copy the code

And monitor the change of service address through watch:

func (c *cluster) watch(cli EtcdClient, key string) {
	rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
	for {
		select {
		case wresp, ok := <-rch:
			if! ok { logx.Error("etcd monitor chan has been closed")
				return
			}
			if wresp.Canceled {
				logx.Error("etcd monitor chan has been canceled")
				return
			}
			ifwresp.Err() ! =nil {
				logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
				return
			}
			// Listen for change notification updates
			c.handleWatchEvents(key, wresp.Events)
		case <-c.done:
			return}}}Copy the code

This part mainly introduces how to customize the Resolver in zRPC and the principle of service discovery based on ETCD. Through this part, we can understand the principle of service registration discovery in zRPC. The source code is more than a rough analysis of the whole process. If you are interested in zRPC source code can learn by yourself

Balancer modules

Load balancing schematic diagram:

Avoiding overload is an important indicator of a load balancing policy. A good load balancing algorithm can balance server resources well. Common load balancing algorithms include round training, random, Hash, and weighted round training. However, in order to deal with various complex scenarios, simple load balancing algorithms often fail to perform well. For example, when the service response time of the rotation algorithm becomes longer, load imbalance is easily caused. Therefore, zRPC defines the default load balancing algorithm P2C(Power of Two Choices), which is similar to resolver. Balancer. Builder (gRPC) ¶ Balancer. Builder (gRPC) ¶ balancer.Builder (gRPC) ¶

Note that zRPC does load balancing on the client side, often via nginx intermediate proxies

The default load balancing algorithm in zRPC framework is P2C. The main ideas of this algorithm are as follows:

  1. Make two random selections from the list of available nodes to get nodes A and B
  2. Compare nodes A and B, and select the node with the lowest load as the selected node

The pseudocode is as follows:

The main algorithm logic is implemented in the Pick method:

func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
	conn balancer.SubConn, done func(balancer.DoneInfo).err error) {
	p.lock.Lock()
	defer p.lock.Unlock()

	var chosen *subConn
	switch len(p.conns) {
	case 0:
		return nil.nil, balancer.ErrNoSubConnAvailable
	case 1:
		chosen = p.choose(p.conns[0].nil)
	case 2:
		chosen = p.choose(p.conns[0], p.conns[1])
	default:
		var node1, node2 *subConn
		for i := 0; i < pickTimes; i++ {
      / / random number
			a := p.r.Intn(len(p.conns))
			b := p.r.Intn(len(p.conns) - 1)
			if b >= a {
				b++
			}
      // Randomly get two of all nodes
			node1 = p.conns[a]
			node2 = p.conns[b]
      // Check whether the node is healthy
			if node1.healthy() && node2.healthy() {
				break}}// Select one of the nodes
		chosen = p.choose(node1, node2)
	}

	atomic.AddInt64(&chosen.inflight, 1)
	atomic.AddInt64(&chosen.requests, 1)
	return chosen.conn, p.buildDoneFunc(chosen), nil
}
Copy the code

The choose method compares the load of randomly selected nodes to determine which node to choose

func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
	start := int64(timex.Now())
	if c2 == nil {
		atomic.StoreInt64(&c1.pick, start)
		return c1
	}

	if c1.load() > c2.load() {
		c1, c2 = c2, c1
	}

	pick := atomic.LoadInt64(&c2.pick)
	if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
		return c2
	} else {
		atomic.StoreInt64(&c1.pick, start)
		return c1
	}
}
Copy the code

Balancer (resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver, resolver)

func init(a) {
	balancer.Register(newBuilder())
}

func newBuilder(a) balancer.Builder {
	return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}
Copy the code

How does gRPC know which balancer to use after registering the balancer? Here we need to use the configuration item to configure, in NewClient with grpc.WithBalancerName method to configure:

func NewClient(target string, opts ... ClientOption) (*client, error) {
	var cli client
	opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
	iferr := cli.dial(target, opts...) ; err ! =nil {
		return nil, err
	}

	return &cli, nil
}
Copy the code

This part mainly introduces the implementation principle and specific implementation method of zRPC load balancing algorithm, and then introduces how zRPC registers custom balancer and how to select custom Balancer. Through this part, we should have a further understanding of load balancing

conclusion

First, introduced the basic use of zRPC, you can see the use of zRPC is very simple, only a few lines of code can build high-performance and its own service governance capabilities of RPC services, of course, there is no comprehensive introduction to the basic use of zRPC, you can check the relevant documents for learning

Then, several important components of zRPC and its implementation principle are introduced, and part of the source code is analyzed. The interceptor module is the focus of the entire zRPC, and it has a rich built-in function, such as fusing, monitoring, load reduction, and so on, which is essential for building highly available microservices. The resolver and Balancer module defines the gRPC resolver and balancer module. Through this part, you can learn the principle of the whole service registration and discovery and how to build your own service discovery system. At the same time, the custom load balancing algorithm becomes no mystery

Finally, zRPC is an RPC framework that has been through a variety of engineering practices, making it a valuable open source project to use in production and to learn the design patterns involved. Hopefully this article will give you a better understanding of zRPC

The project address

Github.com/tal-tech/go…

Wechat communication group

Github scans and adds me to the Go-Zero group