Creating a Client
Let’s look at the code to create the client:
func newClient(config *Config) *Client {
Configuration of etcDV3
conf := clientv3.Config{
Endpoints: config.Endpoints,
DialTimeout: config.ConnectTimeout,
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 3 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(),
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
},
AutoSyncInterval: config.AutoSyncInterval,
}
...
// Call clientv3 method to connect
client, err := clientv3.New(conf)
iferr ! =nil {
config.logger.Panic("client etcd start panic", xlog.FieldMod(ecode.ModClientETCD), xlog.FieldErrKind(ecode.ErrKindAny), xlog.FieldErr(err), xlog.FieldValueAny(config))
}
}
Copy the code
Get and store
Kv interface of etcDV3:
type KV interface {
Put(ctx context.Context, key, val string, opts ... OpOption) (*PutResponse, error) Get(ctx context.Context, keystring, opts ... OpOption) (*GetResponse, error) Delete(ctx context.Context, keystring, opts ... OpOption) (*DeleteResponse, error) Compact(ctx context.Context, revint64, opts ... CompactOption) (*CompactResponse, error) Do(ctx context.Context, op Op) (OpResponse, error) Txn(ctx context.Context) Txn }Copy the code
- The Put, Get, and Delete methods encapsulate the concrete behavior of the Do() method.
Get and get by prefix
// GetKeyValue queries etcd key, returns mvccpb.KeyValue
func (client *Client) GetKeyValue(ctx context.Context, key string) (kv *mvccpb.KeyValue, err error) {
rp, err := client.Client.Get(ctx, key)
...
}
// GetPrefix get prefix
func (client *Client) GetPrefix(ctx context.Context, prefix string) (map[string]string, error) {
resp, err := client.Get(ctx, prefix, clientv3.WithPrefix())
iferr ! =nil {
return vars, err
}
...
}
Copy the code
The difference between getting a single key/value and getting a key/value with a prefix is through an OpOption method WithPrefix().
storage
Storage in Jupiter is not wrapped separately, but rather directly calls etcDV3’s client methods.
func (kv *kv) Put(ctx context.Context, key, val string, opts ... OpOption) (*PutResponse, error) {
Copy the code
Put method in etcDV3 client library. GRPC service registration is about writing service information to etCD.
delete
// DelPrefix is deleted by prefix
func (client *Client) DelPrefix(ctx context.Context, prefix string) (deleted int64, err error) {
resp, err := client.Delete(ctx, prefix, clientv3.WithPrefix())
...
}
Copy the code
Delete information based on the prefix.
Gets the value of multiple keys
To obtain the values of multiple keys in etCD, we can query the results of multiple keys through etCD’s transaction method Txn().
// GetValues queries etcd for keys prefixed by prefix.
func (client *Client) GetValues(ctx context.Context, keys ...string) (map[string]string, error) {
var (
firstRevision = int64(0)
vars = make(map[string]string)
maxTxnOps = 128 // Maximum number of submissions
getOps = make([]string.0, maxTxnOps)
)
// The specific transaction query processing method
doTxn := func(ops []string) error {
txnOps := make([]clientv3.Op, 0, maxTxnOps)
// Add the query operation
for _, k := range ops {
txnOps = append(txnOps, clientv3.OpGet(k,
clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend),
clientv3.WithRev(firstRevision)))
}
// Commit the transactionresult, err := client.Txn(ctx).Then(txnOps...) .Commit()iferr ! =nil {
return err
}
// Process the result returned
for i, r := range result.Responses {
...
}
// Get the latest version number of the revision
if firstRevision == 0 {
firstRevision = result.Header.GetRevision()
}
return nil
}
for _, key := range keys {
// Add the key that needs to be committed, and judge whether the maximum number of commits is reached, then commit for transaction query
getOps = append(getOps, key)
if len(getOps) >= maxTxnOps {
iferr := doTxn(getOps); err ! =nil {
return vars, err
}
getOps = getOps[:0]}}// Check whether there are unsubmitted keys and query if there are
if len(getOps) > 0 {
iferr := doTxn(getOps); err ! =nil {
return vars, err
}
}
return vars, nil
}
Copy the code
Method begins by declaring the maxTxOps maximum number of commits. The doTxn method is the actual transaction query function and then commits the query by breaking the keys into the maxTxOps maximum number. This ensures that the query will not time out because the keys are too large and the query will be slow.
Continuous monitoring of
Create a batch to continuously monitor key changes:
// NewWatch creates continuous monitoring
func (client *Client) WatchPrefix(ctx context.Context, prefix string) (*Watch, error) {
resp, err := client.Get(ctx, prefix, clientv3.WithPrefix())
iferr ! =nil {
return nil, err
}
var w = &Watch{
revision: resp.Header.Revision,
eventChan: make(chan *clientv3.Event, 100),
incipientKVs: resp.Kvs,
}
xgo.Go(func(a) {
ctx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
// Request monitoring
rch := client.Client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify(), clientv3.WithRev(w.revision))
for {
for n := range rch {
...
for _, ev := range n.Events {
select {
case w.eventChan <- ev:
default:
xlog.Error("watch etcd with prefix", xlog.Any("err"."block event chan, drop event message"))
}
}
}
ctx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
if w.revision > 0 {
rch = client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify(), clientv3.WithRev(w.revision))
} else {
rch = client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify())
}
}
})
return w, nil
}
Copy the code
This is requested to a monitor channel through the Etcdv3 client’s Watch() method, which continually receives a value of type WatchChan, which is actually a WatchResponse structure with the return value pushed by etCD. Then we keep processing Events in the return value.
An Event has two types: PUT and DELETE, indicating that certain keys or values are updated or deleted.
Lease mechanism
Let’s start with lease, a distributed system technology family.
The essence of the lease mechanism in ETCD is to set an expiration time for K-V, after which the service needs to renew k-V at regular intervals, otherwise k-V will be automatically cleaned up.
Create lease code:
// Create a lease
func (reg *etcdv3Registry) getSession(k string, opts ... concurrency.SessionOption) (*concurrency.Session, error){... sess, err := concurrency.NewSession(reg.client.Client, opts...)iferr ! =nil {
return sess, err
}
reg.rmu.Lock()
reg.sessions[k] = sess
reg.rmu.Unlock()
return sess, nil
}
// Use the lease
if ttl := reg.Config.ServiceTTL.Seconds(); ttl > 0 {
sess, err := reg.getSession(key, concurrency.WithTTL(int(ttl)))
iferr ! =nil {
return err
}
opOptions = append(opOptions, clientv3.WithLease(sess.Lease()))
}
// Submit information to etCD
_, err := reg.client.Put(readCtx, key, val, opOptions...)
Copy the code
The lease is created in the Concurrency package of the ETCD library. The way to do this is simply to add WithLease() to the request options.
A distributed lock
Etcd’s distributed Lock is very similar to the sync.mutex we normally use, with two methods Lock() and Unlock().
// Mutex ...
type Mutex struct {
s *concurrency.Session
m *concurrency.Mutex
}
// NewMutex ...
func (client *Client) NewMutex(key string, opts ... concurrency.SessionOption) (mutex *Mutex, err error) {
mutex = &Mutex{}
// Default session TTL = 60s
mutex.s, err = concurrency.NewSession(client.Client, opts...)
iferr ! =nil {
return
}
mutex.m = concurrency.NewMutex(mutex.s, key)
return
}
// Lock ...
func (mutex *Mutex) Lock(timeout time.Duration) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return mutex.m.Lock(ctx)
}
// Unlock ...
func (mutex *Mutex) Unlock(a) (err error) {
err = mutex.m.Unlock(context.TODO())
iferr ! =nil {
return
}
return mutex.s.Close()
}
Copy the code
The difference is that distributed locks have an expiration time.
reference
- Etcd V3 principle analysis
- Summary of etCD use experience
Article series
- Jupiter framework entry introduction
- GRPC through ETCD service discovery and registration – source code analysis
- Jupiter -etcd client introduction