preface

So far our framework has had some service governance capabilities, but this time we will build on those capabilities. Space is limited here only partial implementation, complete code reference: Github

The ZooKeeper registry

Instead of using the ZK client directly (learned from RPCX), libkv encapsulates operations on several storage services including Consul, Etcd, Zookeeper, and BoltDB. You’ll have to write your own client if you want to support other types of storage. A ZK-based registry is defined as follows:

type ZookeeperRegistry struct {
	AppKey         string // A ZookeeperRegistry instance is associated with an AppKey
	ServicePath    string // The basic path location of the data store, such as /service/providers
	UpdateInterval time.Duration // Indicates the interval at which data is pulled
	kv store.Store // Packaged ZK client
	providersMu sync.RWMutex
	providers   []registry.Provider // A list of local caches
	watchersMu sync.Mutex
	watchers   []*Watcher / / watcher list
}
Copy the code

The initialization logic is as follows:

func NewZookeeperRegistry(AppKey string, ServicePath string, zkAddrs []string,
	updateInterval time.Duration, cfg *store.Config) registry.Registry {
	zk := new(ZookeeperRegistry)
	zk.AppKey = AppKey
	zk.ServicePath = ServicePath
	zk.UpdateInterval = updateInterval
	kv, err := libkv.NewStore(store.ZK, zkAddrs, cfg)
	iferr ! =nil {
		log.Fatalf("cannot create zk registry: %v", err)
	}
	zk.kv = kv
	basePath := zk.ServicePath
	if basePath[0] = ='/' { // The path cannot start with a slash
		basePath = basePath[1:]
		zk.ServicePath = basePath
	}
	// Create a basic path first
	err = zk.kv.Put(basePath, []byte("base path"), &store.WriteOptions{IsDir: true})
	iferr ! =nil {
		log.Fatalf("cannot create zk path %s: %v", zk.ServicePath, err)
	}
	// explicitly pull the first data
	zk.doGetServiceList()
	go func(a) {
		t := time.NewTicker(updateInterval)
		for range t.C {
			// Pull data periodically
			zk.doGetServiceList()
		}
	}()
	go func(a) {
		// Background watch data
		zk.watch()
	}()
	return zk
}
Copy the code

We perform two background tasks when initializing the registry: a timed pull and a listening for data, in a combination of push and pull. At the same time, the data obtained by listening is full data, because it is easier to implement, and later, if the list of services grows larger, you may need to add a mechanism based on the version number or transfer only incremental data. Here are a few additional points:

  1. The background periodically pulls data and caches it
  2. Return to cache directly when querying
  3. Nodes are added in ZK during registration and deleted in ZK during logout
  4. Instead of listening for every service provider, listen for its parent directory and pull down the list of service providers when changes occur, which reduces the number of Watchers and makes the logic simpler
  5. Because of point 4, you need to change the contents of the parent directory (lastUpdate) during registration and logout to trigger listening

The specific registration and logout logic is not listed here. For details, see Github

Client Heartbeat

If we were using ZK as a registry, it might be simpler to simply add service providers directly to ZK as temporary nodes so that we can leverage the features of temporary nodes for dynamic service discovery. However, the libkv library we used does not support temporary nodes, and other storage services such as ETCD may not support temporary nodes either, so we registered persistent nodes in the registry. In this case, it is possible that some service providers that are inaccessible due to special circumstances did not unregister themselves from the registry in a timely manner, so clients need additional ability to determine whether a service provider is available rather than relying entirely on the registry.

Therefore, we need to add support for client heartbeat. The client can periodically send heartbeat requests to the server, and the server can directly return the heartbeat request as long as it notifies the client that it is still available. The client can degrade the heartbeat failed service provider based on the threshold until the heartbeat is recovered or the service provider is deregister. The client sends the heartbeat logic as follows:

func (c *sgClient) heartbeat(a) {
	if c.option.HeartbeatInterval <= 0 {
		return
	}
	// Sends heartbeat at the specified interval
	t := time.NewTicker(c.option.HeartbeatInterval)
	for range t.C {
		if c.shutdown {
			t.Stop()
			return
		}
		// Traverse each RPCClient for a heartbeat check
		c.clients.Range(func(k, v interface{}) bool {
			err := v.(RPCClient).Call(context.Background(), ""."".nil)
			c.mu.Lock()
			iferr ! =nil {
				// Count the heartbeat failure
				if fail, ok := c.clientsHeartbeatFail[k.(string)]; ok {
					fail++
					c.clientsHeartbeatFail[k.(string)] = fail
				} else {
					c.clientsHeartbeatFail[k.(string)] = 1}}else {
				// The heartbeat succeeds
				c.clientsHeartbeatFail[k.(string)] = 0
				c.serversMu.Lock()
				for i, p := range c.servers {
					if p.ProviderKey == k {
						delete(c.servers[i].Meta, protocol.ProviderDegradeKey)
					}
				}
				c.serversMu.Unlock()
			}
			c.mu.Unlock()
			// If the number of heartbeat failures exceeds the threshold, the heartbeat is degraded
			if c.clientsHeartbeatFail[k.(string)] > c.option.HeartbeatDegradeThreshold {
				c.serversMu.Lock()
				for i, p := range c.servers {
					if p.ProviderKey == k {
						c.servers[i].Meta[protocol.ProviderDegradeKey] = true
					}
				}
				c.serversMu.Unlock()
			}
			return true}}})Copy the code

authentication

Authentication is easy to implement. The client can carry authentication information in metadata, and the server can authenticate through a specified Wrapper. The code for the server-side Wrapper is as follows:

type AuthFunc func(key string) bool
type ServerAuthInterceptor struct {
	authFunc AuthFunc
}
func NewAuthInterceptor(authFunc AuthFunc) Wrapper {
	return &ServerAuthInterceptor{authFunc}
}
func (sai *ServerAuthInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {
	return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {
		if auth, ok := ctx.Value(protocol.AuthKey).(string); ok {
			// If the authentication succeeds, the service logic is executed
			if sai.authFunc(auth) {
				requestFunc(ctx, response, response, tr)
				return}}// An exception is returned if authentication fails
		s.writeErrorResponse(response, tr, "auth failed")}}Copy the code

Fusing the drop

For the time being, a simple fuse based on the time window is implemented as follows:

type CircuitBreaker interface {
	AllowRequest() bool
	Success()
	Fail(err error)
}
type DefaultCircuitBreaker struct {
	lastFail  time.Time
	fails     uint64
	threshold uint64
	window    time.Duration
}
func NewDefaultCircuitBreaker(threshold uint64, window time.Duration) *DefaultCircuitBreaker {
	return &DefaultCircuitBreaker{
		threshold: threshold,
		window:    window,
	}
}
func (cb *DefaultCircuitBreaker) AllowRequest(a) bool {
	if time.Since(cb.lastFail) > cb.window {
		cb.reset()
		return true
	}

	failures := atomic.LoadUint64(&cb.fails)
	return failures < cb.threshold
}
func (cb *DefaultCircuitBreaker) Success(a) {
	cb.reset()
}
func (cb *DefaultCircuitBreaker) Fail(a) {
	atomic.AddUint64(&cb.fails, 1)
	cb.lastFail = time.Now()
}
func (cb *DefaultCircuitBreaker) reset(a) {
	atomic.StoreUint64(&cb.fails, 0)
	cb.lastFail = time.Now()
}
Copy the code

conclusion

The content of this time so far, any comments or suggestions welcome correction.

The historical link

Implementing an RPC framework from scratch (zero)

Implementing an RPC Framework from Scratch (PART 1)

Implementing an RPC Framework from Scratch (PART 2)