ETCD source code based on V3.5, before the analysis, need to build a good source analysis environment. First of all, clone the source code of ETCD from GitHub’s repository and use Docker to build our ETCD test cluster. The command is as follows:
REGISTRY=quay. IO/coreOS /etcd NAME_1=etcd-node-0 NAME_2=etcd-node-1 NAME_3=etcd-node-2 Check the docker subnetwork segment HOST_1=172.20.0.2 HOST_2=172.20.0.3 HOST_3=172.20.0.4 PORT_1=2379 PORT_2=12379 PORT_3=22379 PORT_C_1=2380 PORT_C_2=12380 PORT_C_3=22380 CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_C_1},${NAME_2}=http://${HOST_2}:${PORT_C_2},${NAME_3}=http://${HOST_3}:${PORT_ C_3} # need to ensure that the directory exists and writable DATA_DIR=/var/folders/ # Need to create a Docker network to simulate the cluster network partition situation. docker network create etcd_cluster docker run \ -p $PORT_1:$PORT_1 \ -p $PORT_C_1:$PORT_C_1 \ --volume "${DATA_DIR} ${NAME_1} : / etcd - data" \ - name ${NAME_1} \ - network etcd_cluster \ ${REGISTRY} : v3.5.0 \ / usr/local/bin/etcd \ --name ${NAME_1} \ --data-dir /etcd-data \ --listen-client-urls http://0.0.0.0:$PORT_1 \ --advertise-client-urls http://$HOST_1:$PORT_1 \ --listen-peer-urls http://0.0.0.0:$PORT_C_1 \ --initial-advertise-peer-urls http://$HOST_1:$PORT_C_1 \ --initial-cluster ${CLUSTER} \ --initial-cluster-token tkn \ --initial-cluster-state new \ --log-level info \ --logger zap \ --log-outputs stderr docker run \ -p $PORT_2:$PORT_2 \ -p $PORT_C_2:$PORT_C_2 \ - volume = ${DATA_DIR} ${NAME_2} : / etcd - data \ - name ${NAME_2} \ - network etcd_cluster \ ${REGISTRY} : v3.5.0 \ /usr/local/bin/etcd \ --name ${NAME_2} \ --data-dir /etcd-data \ --listen-client-urls http://0.0.0.0:$PORT_2 \ --advertise-client-urls http://$HOST_2:$PORT_2 \ --listen-peer-urls http://0.0.0.0:$PORT_C_2 \ --initial-advertise-peer-urls http://$HOST_2:$PORT_C_2 \ --initial-cluster ${CLUSTER} \ --initial-cluster-token tkn \ --initial-cluster-state new \ --log-level info \ --logger zap \ --log-outputs stderr docker run \ -p $PORT_3:$PORT_3 \ -p $PORT_C_3:$PORT_C_3 \ --volume=${DATA_DIR}${NAME_3}:/etcd-data \ --name ${NAME_3} \ --network etcd_cluster \ ${REGISTRY}:v3.5.0 \ /usr/local/bin/etcd \ --name ${NAME_3} \ --data-dir /etcd-data \ --listen-client-urls http://0.0.0.0:$PORT_3 \ \ - advertise - the client - urls http://$HOST_3:$PORT_3 - listen - peer - urls http://0.0.0.0:$PORT_C_3 \ --initial-advertise-peer-urls http://$HOST_3:$PORT_C_3 \ --initial-cluster ${CLUSTER} \ --initial-cluster-token tkn \ --initial-cluster-state new \ --log-level info \ --logger zap \ --log-outputs stderrCopy the code
As shown above, we created three ETCD nodes to form a cluster. Next, we officially enter the source code analysis process.
ETCD Client Startup Process Analysis
Let’s look at a sample startup code:
cli, err := clientv3.New(clientv3.Config{ Endpoints: exampleEndpoints(), DialTimeout: dialTimeout, }) if err ! = nil { log.Fatal(err) } defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) _, err = cli.Put(ctx, "sample_key", "sample_value") cancel() if err ! = nil { log.Fatal(err) }Copy the code
A simple program only needs to provide the IP and port of all nodes in the cluster to be accessible. It is important to fill in all nodes of the ETCD cluster in order to have failover and load balancing features. Or a proxy node running an ETCD (ETCD gateway) is responsible for forwarding requests, so that only the proxy node IP is filled in, and of course there is a performance penalty.
1. ETCD Client startup process analysis
Let’s look at how the Client is created:
func newClient(cfg *Config) (*Client, error) { // -----A----- ctx, cancel := context.WithCancel(baseCtx) client := &Client{ conn: nil, cfg: *cfg, creds: creds, ctx: ctx, cancel: cancel, mu: new(sync.RWMutex), callOpts: defaultCallOpts, lgMu: new(sync.RWMutex), } // -----A----- // -----B----- client.resolver = resolver.New(cfg.Endpoints...) conn, err := client.dialWithBalancer() if err ! = nil { client.cancel() client.resolver.Close() return nil, err } client.conn = conn // -----B----- // -----C----- client.Cluster = NewCluster(client) client.KV = NewKV(client) client.Lease = NewLease(client) client.Watcher = NewWatcher(client) client.Auth = NewAuth(client) client.Maintenance = NewMaintenance(client) ... // -----C----- return client, nil }Copy the code
A section of code analysis
It initializes an instance of client and passes the Config structure to it. What configuration items are included in Config?
Type Config struct {Endpoints []string 'json:" Endpoints "' At each AutoSyncInterval, the ETCD client automatically requests the ETCD server for the latest list of all nodes in the ETCD cluster // // the default is 0, Duration 'json:"auto-sync-interval"' // The timeout period for establishing the underlying GRPC connection DialTimeout time.Duration Json :"dial-timeout" // This configuration and DialKeepAliveTimeoutt // are used to enable the KeepAlive function provided by GRPC, which is used to keep the underlying TCP connection valid. // Detect disconnection exceptions in time. // // Default keepalive DialKeepAliveTime time.Duration 'json:"dial-keep-alive-time"' // After the client sends the keepalive ping, If the waiting time for the ping ACK packet from the server exceeds this time, 'Translation is Closed' DialKeepAliveTimeout time.Duration is reported 'json:"dial-keep-alive-timeout"' // is also a keepalive setting. // True indicates that ping is performed regardless of whether GRPC connections are active. // False indicates that ping is not sent if GRPC connections are not active. PermitWithoutStream bool 'json:"permit-without-stream"' // Maximum number of bytes allowed to be sent, default is 2MB // // If you want to set more than 2MB KV, // it is useless to change this configuration only, because the ETCD server side limit is also 2MB. // You need to change the ETCD server startup parameter: '--max-request-bytes' before changing this value. MaxCallSendMsgSize int The maximum number of bytes that can be received, MaxCallRecvMsgSize Int // HTTPS certificate configuration TLS *tls.Config // Context, usually used to cancel the operation ctx.Context // set this value, Will refuse to connect to lower versions of ETCD // what is lower versions? // All versions smaller than V3.2 are lower versions. RejectOldCluster bool 'json:"reject-old-cluster" // GRPC connection configuration, DialOptions []grpc.DialOption // Zap Logger configuration // ETCD is zap Logger *zap.Logger LogConfig * zap.config . }Copy the code
There are also some common configuration items that are relatively simple and are not listed here.
B section code analysis
This section is the core of the entire code and does two things:
-
The resolver is actually a concept in GRPC. For example, the DNS resolver, the domain name is converted into a real IP address. A service registry is also a resolution service that translates service names into real IP addresses.
Specific concepts are not developed, if more interested in GRPC this aspect, the end of the article will recommend a very good GRPC source analysis blog.
Etcd: resolver: etCD: resolver: resolver: resolver: resolver: Resolver: Resolver: Resolver: Resolver: Resolver: Resolver
- Pass the ETCD server addresses in Endpoints to the GRPC framework. In this case, since ETCD’s own parser does not support DNS resolution, Endpoints can only be IP addresses or Unix sockets.
- It is important to tell GRPC that if there are multiple Endpoints, the load-balancing policy is polling.
-
DialWithBalancer () establishes a server link to ETCD
func (c *Client) dialWithBalancer(dopts ... grpc.DialOption) (*grpc.ClientConn, error) { creds := c.credentialsForEndpoint(c.Endpoints()[0]) opts := append(dopts, grpc.WithResolvers(c.resolver)) return c.dial(creds, opts...) }Copy the code
The name of the method used to establish a connection to the ETCD server is interesting. Although it is called dialWithBalancer, the internal code is very simple and you can see that there is no Balancer in it. ETCD uses its own resolver, which already has a load balancing policy written in it: round_robin. Therefore, pass resolver through grpc.withresolvers () to achieve load balancing effect.
Now dial() is a long method, but the overall logic is clear. After omitting the extranet code, it does the following things:
func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ... Grpc.dialoption) (* grpc.clientconn, error) {// First, ETCD adds some of its own // configurations to the GRPC framework via this line of code, such as: The KeepAlive feature (mentioned in the configuration), the // TLS certificate configuration, and most importantly the retry policy. opts, err := c.dialSetupOpts(creds, dopts...) . If I pass a non-zero DialTimeout and // a context with a timeout to the client, which timeout will be used? // Create a new child context(DCTX), parent context and DialTimeout // Whichever deadline comes first takes effect. DCTX := c.tx if c.fg.DialTimeout > 0 {var cancel context.CancelFunc // Contains both the parent context and DialTimeout // Whichever takes effect first. DCTX, cancel = context.withTimeout (c.tx, c.fg.dialTimeout) defer cancel()} // Finally call grpc.dialContext () to establish connection conn, err := grpc.DialContext(dctx, target, opts...) . return conn, nil }Copy the code
C section code analysis
Section C code is nothing more than initialization of some functional interfaces, such as KV interface (used to provide Put, Get, etc.), Wathcer interface (used to monitor Key), etc. The specific initialization will be discussed in the analysis of each interface.
Back to the startup process, after the initialization function is completed, the getToken is used to obtain the token after we enable the account password function of ETCD, and then access the GRPC interface provided by ETCD.
Then there are the RejectOldCluster and autoSync functions, which were mentioned in the Config section and won’t be covered here.
ETCD Client retry policy analysis
The analysis of the automatic retry strategy provided by the ETCD client is the focus of this article. Automatic retry is an important guarantee of ETCD’s high availability features. Before moving on, it is important to keep the following two concepts in mind:
- Automatic retries do not occur on the same node in the ETCD cluster, unlike the usual retries, because ETCD provides a load balancing policy for cluster access through the GRPC framework, so the retries are polling each node in the cluster.
- Automatic retries will only retry certain errors, such as codes.unavailable
Let’s take a look at how ETCD uses the interceptor provided by GRPC to do automatic retry, so we can apply the same technique to our OWN GRPC projects:
DialWithBalancer ->dial->dialSetupOpts rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction)) opts = append(opts, grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)), grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)), )Copy the code
Looking at the code above, there are only two steps to automatically retry:
- Create the backoff function, which calculates the retry wait time.
- With WithXXXInterceptor(), register a retry interceptor that will be called back every time GRPC has a request.
Here, the GRPC. WithStreamInterceptor (c.s. treamClientInterceptor (withMax (0), rrBackoff)), we see the Stream retry interceptors, The maximum number of retries is set to 0 (withMax()), meaning no retries, which is intentional because Stream retries on the Client side are not supported. (The Client needs to retry the Stream, which needs to be handled separately, not through the interceptor.)
Let’s first look at how to calculate the wait time:
// waitBetween Retry interval // jitterFraction Random jitter rate, // For example, the default retry interval is 25ms, and the jitter rate is 0.1, // The actual retry interval is 25 and 2.5ms. Func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc { return func(attempt uint) time.Duration { n := uint(len(c.Endpoints())) quorum := (n/2 + 1) if attempt%quorum == 0 { c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction)) return jitterUp(waitBetween, jitterFraction) } c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum)) return 0 } }Copy the code
RoundRobinQuorumBackoff returns a closure containing the logic to calculate the retry interval.
1. If the number of retries has reached the quorum of the cluster, the actual interval is calculated and the retry is performed after the interval expires. 2. Otherwise, 0 is returned, that is, try again immediately.Copy the code
Remember the two concepts you have to remember? Including writing dead load balancing strategy is polling, and the retry logic must cooperate with load balancing is polling strategy, the effect of: if you access the cluster a node failure, may be there is something wrong with that one node, but if the whole cluster is good, and then retry immediately, polling to step down nodes.
However, if most of the nodes in the cluster (quorum) fail after multiple retries, there is a problem with the cluster, and you need to calculate the interval and try again later to see if the problem can be resolved.
Here you can also see the ETCD Client, where there are a lot of details to consider, a simple retry time calculation, and a small logical optimization.
So how is the retry interceptor implemented? Now look at the code for interceptors:
func (c *Client) unaryClientInterceptor(optFuncs ... retryOption) grpc.UnaryClientInterceptor { ... // If the maximum number of retries is set to 0, no retries are performed. if callOpts.max == 0 { return invoker(ctx, method, req, reply, cc, grpcOpts...) } var lastErr error for attempt := uint(0); attempt < callOpts.max; Attempt++ {// calculates the retry interval and blocks the code, If err := waitRetryBackoff(CTX, attempt, callOpts); err ! = nil {return err} // Redo GRPC request lastErr = invoker(CTX, method, req, reply, cc, grpcOpts...) If lastErr == nil {// retry successfully, exit return nil} The server returns a Context Error (timed out, cancelled) and tries again // 2. Error if isContextError(lastErr) {if ctx.err ()! = nil {// the client's own CTX also reported an error, do not retry, exit. Return lastErr} // The server returns, Directly to retry the continue} if callOpts. RetryAuth && rpctypes. Error (lastErr) = = rpctypes. ErrInvalidAuthToken {/ / is AuthToken is not correct, Gterr := c.getToken(CTX)... Continue} // Retries are made only for specific errors (code.unavailable) // Otherwise Err is returned and no retries are made. if ! isSafeRetry(c.lg, lastErr, callOpts) { return lastErr } } return lastErr } }Copy the code
The code has been streamlined to some extent, but the main flow has been retained.
This completes the whole retry process for ETCD.
conclusion
Through the code analysis of the whole startup process of ETCD, we can summarize the following points:
1. Endpoints are used to calculate the quorum for load balancing and retry policies. Be sure to fill in all nodes in the cluster or enable AutoSync. 2. ETCD has written GRPC resolver and balancer by itself, which can be used for reference in GRPC related projects. Resolver can only resolve IP and Unix sockets. Balancer policy write is a polling policy. 3. The ETCD retry process only retries some errors, so don't count on ETCD's automatic retry.Copy the code
Startup flow chart, which lists the important functions in the entire startup process:
graph TD
Config --> New
New --> newClient
newClient --> resolver.New
resolver.New --> dialWithBanlancer
dialWithBanlancer --> dial
dial --> grpc.DialContext
Finally, this paper involves the basic knowledge of some GRPC, do not understand friend can go to (blog.csdn.net/u011582922/…). Let’s take a look here. It’s very detailed.