Service discovery:DiscoveryClient

Service discovery is accomplished by the cooperation between the client and the server. This section begins by exploring the mechanisms in the client.

This article deletes some of the unimportant steps in the source code, please refer to the source code for the complete process;

In addition, the source code analysis in this paper is based on the spring-boot application and the dependence of the actuator;

The version of Eureka-Client used in this paper is 1.9.21, the spring-boot version is 2.3.0.release, and the spring-cloud version is Hoxton.SR5.

Code initialization entry

com.netflix.discovery.DiscoveryClient#DiscoveryClient(
    ApplicationInfoManager applicationInfoManager, 
    EurekaClientConfig config, 
    AbstractDiscoveryClientOptionalArgs args,
    Provider<BackupRegistry> backupRegistryProvider, 
    EndpointRandomizer endpointRandomizer)
Copy the code

Core mechanics in the client

  • The contract
  • Status updates
  • Fetching the registry

Service discovery client initialization process

As you can see, the client uses three parts to accomplish the following functions:

  • Renew (send heartbeat)
  • Flush the cache (fetch the registry)
  • Instance state replication (updating local instance state to remote registry)

TimedSupervisorTaskimplementation

As you can see, two of the three main functions mentioned above are handled by the TimedSupervisorTask. So this is where the timedcontainer is being analyzed.

This class is an implementation of TimerTask, but DiscovertClient uses the task as a Runnable when it is scheduled through scheduler#schedule. So we don’t care about its function as a TimerTask.

Its important properties:

  • Three counters are used to count the execution of a task schedule: timeout, rejection, and exception cases

    private final Counter timeoutCounter;
    private final Counter rejectedCounter;
    private final Counter throwableCounter;
    Copy the code
  • Tasks that need to be scheduled:

    private final Runnable task
    Copy the code
  • A pool of scheduling threads used to schedule itself:

    private final ScheduledExecutorService scheduler;
    Copy the code
  • The thread pool used to execute asynchronous tasks, which is used to execute tasks wrapped in this class:

    private final ThreadPoolExecutor executor;
    Copy the code
  • Timeout period of the scheduled task. If the timeout period exceeds the timeout period, the task is considered to have timed out:

    private final long timeoutMillis
    Copy the code
  • The delay of the execution of the next schedule, namely, how long will it take after the completion of a schedule before the next schedule starts? This time will change with the timeout times to reduce invalid schedules:

    private final AtomicLong delay
    Copy the code
  • Maximum time interval for scheduling itself. Because when the task times out, the delay time of each time is doubled, and this maximum interval is the upper limit of the increase of delay time, so as to avoid unlimited delay of its own scheduling:

    private final long maxDelay
    Copy the code
  • Records the number of active threads in the executor

    private final LongGauge threadPoolLevelGauge;
    Copy the code

There is not much code in this class, except for a constructor, which is just an implementation of the run method, as follows:

@Override
public void run(a) { Future<? > future =null;
    try {
        // Submit tasks to be executed with the executor of the task (such as renewing the contract, fetching the registry, etc.)
        future = executor.submit(task);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        If the task times out, a TimeoutException will be thrown and a catch block will be entered
        // Execution of subsequent statements in the try section continues only after the result of normal execution has been obtained within the timeout period
        future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
        // The task is successfully executed and the delay is restored to the initial state
        delay.set(timeoutMillis);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
    } catch (TimeoutException e) {
        logger.warn("task supervisor timed out", e);
        timeoutCounter.increment();
		// Time out, record the current delay
        long currentDelay = delay.get();
        // Set the delay to 2 times, taking the minimum value between the maximum delay time and the double delay time, i.e., the maximum delay time cannot exceed maxDelay
        long newDelay = Math.min(maxDelay, currentDelay * 2);
        // Set the delay time
        delay.compareAndSet(currentDelay, newDelay);

    } catch (RejectedExecutionException e) {
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, reject the task", e);
        } else {
            logger.warn("task supervisor rejected the task", e);
        }

        rejectedCounter.increment();
    } catch (Throwable e) {
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, can't accept the task");
        } else {
            logger.warn("task supervisor threw an exception", e);
        }

        throwableCounter.increment();
    } finally {
        // If you get to this point, call the cancel method (only meaningful if it is not executed properly)
        if(future ! =null) {
            future.cancel(true);
        }

        if(! scheduler.isShutdown()) {// Perform its next schedule with the delay previously set
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); }}}Copy the code

Conclusion:

  1. The actual tasks to be executed are executed in the form of asynchronous tasks in another thread pool, implementing a timeout mechanism;
  2. If the task executes normally each time, the task is scheduled based on the initial timeout. If the task execution times out (the network may be different or the server may be faulty), the next scheduling time is postponed to reduce invalid scheduling.
  3. When the function is implemented, the data is also collected for the execution to facilitate the analysis of the operation;

Refresh cache mechanism

From the code before you can see, the real contract task with com.net flix. Discovery. DiscoveryClient. CacheRefreshThread form managed to TimedSupervisorTask schedule.

Here is the CacheRefreshThread:

class CacheRefreshThread implements Runnable {
    public void run(a) { refreshRegistry(); }}Copy the code

Methods: entrance com.net flix. Discovery. DiscoveryClient# refreshRegistry. Next, the refresh steps of the registry are analyzed.

In general, full updates fetch all of the server’s registries directly, while incremental updates fetch the incremental part from the server side, Then according to the local server provided by the data contained in the application of the type of operation (add, modify, or delete) to update the local data (in com.net flix. Discovery. DiscoveryClient# updateDelta method). How the server returns incremental data when the client does not provide the version number needs to be analyzed after the implementation of the server.

Frequency about refresh cache mechanism, is determined by parameter registryFetchIntervalSeconds, default values for 30 seconds.

The contract mechanism

If you review the startup process of DiscoveryClient again, you can see that renewal tasks (sending heartbeat tasks) are scheduled in a similar manner as registry refresh tasks.

Below is the com.net flix. Discovery. DiscoveryClient. HeartbeatThread class declaration:

private class HeartbeatThread implements Runnable {
    public void run(a) {
        if(renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); }}}Copy the code

Com.net flix. Discovery. DiscoveryClient. The package com.net flix HeartbeatThread. Discovery. DiscoveryClient# renew method calls, It is then passed on as a Runnable to the Timedcontainer container for handling. Even the heartbeat mechanism is protected by the time-out – delay mechanism.

It doesn’t have much internal logic, so let’s look at its internal logic:

boolean renew(a) {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        // Send a heartbeat request to the server through the API
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            // A 404 is returned from the server, and a registration attempt is made
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            // Successful registration returns true
            return success;
        }
        // The status code is 200, which returns true
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false; }}Copy the code

The total logic is two points:

  • Returns true if the heartbeat was sent successfully
  • If the response status code of the heartbeat request is 404, a registration request is initiated and true is returned if the registration succeeds

About the frequency of renewal: Determined by renewalIntervalInSecs parameter. The default value is 30 seconds.

The service registry

There are two things to know about service registration

  1. The timing of service registration
  2. Logic for service registration

First, look at the logic of the service registration, the method entry on the com.net flix. Discovery. DiscoveryClient# register:

/** * Register with the eureka service by making the appropriate REST call. */
boolean register(a) throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        // Call the API to submit your instance information to the registry
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
Copy the code

The whole process of registration is actually very simple logic, just call API to transmit their instance information to the registry.

Second, look at the timing of registration:

  1. DiscoveryClientAt construction time, when the client starts, the registration method is called once;
  2. The second is therenewMethod, if the response status code is 404 when the heartbeat is sent to the server, call the registration method;
  3. In the logic of the instance replicatorcom.netflix.discovery.InstanceInfoReplicator#runIf the instance information is dirty, register it.

Instance information refresh mechanism

Before the information of instance refresh mechanism, we understand, the first is for instance information class com.net flix. Appinfo. InstanceInfo.

The important attributes in the class are as follows:

/** * Client instance id, unique in the same application, for example, 192.168.18.109:blog-service:8010 */
private volatile String instanceId;
/** * The name of the application to which the instance belongs. By default, the SERVICE name is uppercase, for example, blog-service */
private volatile String appName;
/** * Instance IP address */
private volatile String ipAddr;
/** * The default port number of the client is 7001 */
private volatile int port = DEFAULT_PORT;
/** * Security port of the client. The default value is 7002 */
private volatile int securePort = DEFAULT_SECURE_PORT;
/** * Virtual IP address (logical address), service name, for example: blog-service */
private volatile String vipAddress;
/** * Secure virtual IP address */
private volatile String secureVipAddress;
/** * Specifies whether to use secure ports. The default value is false */
private volatile boolean isSecurePortEnabled = false;
/** * Whether to use insecure ports. The default value is true */
private volatile boolean isUnsecurePortEnabled = true;
/** * Data center information */
private volatile DataCenterInfo dataCenterInfo;
/** * Host name */
private volatile String hostName;
/** * Instance status */
private volatile InstanceStatus status = InstanceStatus.UP;
/** * Records the status of instance modification. The initial value is UNKNOWN. * This state is only meaningful on the Server side. The value of the status of instanceInfo on the Server can be changed by changing the overriddenStatus * of instanceInfo on the Server. * /
private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
/** * If instance information is "dirty", this field is true */ if changes are sent locally but not synchronized to the server
private volatile boolean isInstanceInfoDirty = false;
/** * Renewal information */
private volatile LeaseInfo leaseInfo;
/** * is the server discovered by the candidate service */
private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
/** * Client metadata. Users can customize */ by configuration
private volatile Map<String, String> metadata;
/** * The last time information was updated on the server */
private volatile Long lastUpdatedTimestamp;
/** * The last time the instance information changed */
private volatile Long lastDirtyTimestamp;
/** * On the server, the type of information about the instance changed (ADDED, MODIFIED, DELETED) */
private volatile ActionType actionType;
/** * Version field of the instance information */
private String version = VERSION_UNKNOWN;
Copy the code

Where the status field holds the service state of the client instance, the optional value of this enumeration InstanceStatus is:

  • UP: Able to receive traffic;
  • DOWN: Do not send traffic. The health check fails.
  • STARTING: Starting, do not send traffic.
  • OUT_OF_SERVICE: Do not provide services and do not send trafficshutdownWhen used;
  • UNKNOWN: unknown state;

This class holds information for registration and for discovery (use) by other components.

Instance information refresh method is: com.net flix. Discovery. DiscoveryClient# refreshInstanceInfo. Here’s a sequence diagram of it:

ApplicationInfoManager simply manages instance information.

Several classes are involved in the figure:

  • EurekaHealthCheckHandler : spring-cloud-netflix-eureka-clientClass in a package, inheritedcom.netflix.appinfo.HealthCheckHandlerintegrationspringHealth check mechanism. It does this byapplicationContext.getBeansOfType(HealthIndicator.class)Gets the health indicator in the containerbeanAs a way of getting a health check.
  • org.springframework.boot.actuate.health.HealthIndicator: Health indicator, fromspring-boot-actuatorPackage, provideHealth health()Methods To obtain health related information. The health check implementation classes involved in the figure are (the list of health checkers varies depending on the dependency introduced, for reference only) :
    • org.springframework.cloud.netflix.hystrix.HystrixHealthIndicator
    • org.springframework.boot.actuate.system.DiskSpaceHealthIndicator
    • org.springframework.boot.actuate.health.PingHealthIndicator: always returnupThe health indicator, which is the default implementation class, is the bottom guarantee mechanism;
    • org.springframework.cloud.health.RefreshScopeHealthIndicator
  • org.springframework.boot.actuate.health.SimpleStatusAggregator:org.springframework.boot.actuate.health.StatusAggregatorSimple implementation, aggregate a group of states to get a result state;
  • com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener: listener interface that listens for instance state changes, is observer;

In general, this is to update the instance information in three parts

  • Update the hosthostnameipInformation;
  • Update renewal information;
  • Update the state of the instance through the health checker

Will be mentioned below, call time to refresh instance information just in case information replicators, namely com.net flix. Discovery. InstanceInfoReplicator# run.

Instance information replicator

The main function of the instance information replicator is to refresh the information of the current client instance and synchronize it to the server if there is any change.

Code implementation is mainly in com.net flix. Discovery. InstanceInfoReplicator class.

The main attributes are as follows:

/** * Used to communicate with the server */
private final DiscoveryClient discoveryClient;
/** * A reference to instance information */
private final InstanceInfo instanceInfo;
/** * Replication interval */
private final int replicationIntervalSeconds;
/** * Is used to schedule the scheduler that replicates itself. The number of core threads is 1 */
private final ScheduledExecutorService scheduler;
/** * is used to store a reference to the result of the asynchronous task being called and executed, mainly used to force task cancellation on update */
private final AtomicReference<Future> scheduledPeriodicRef;
/** * indicates whether the replicator is started */
private final AtomicBoolean started;
/** * The implementation of the limiting mechanism, the main limiting object is to force the copy operation */
private final RateLimiter rateLimiter;
/** * The upper limit, which is positively correlated with the number of operations that can pass in a given time */
private final int burstSize;
/ * * * two: calculated by burstSize and replicationIntervalSeconds * 60 * this burstSize/enclosing replicationIntervalSeconds * /
private final int allowedRatePerMinute;
Copy the code

And starting method is com.net flix. Discovery. InstanceInfoReplicator# start, The object of building and construction of this method are all on the com.net flix. Discovery. DiscoveryClient# initScheduledTasks method, call the start method, submitted to the first delay scheduling tasks. As follows:

public void start(int initialDelayMs) {
    if (started.compareAndSet(false.true)) {
        // This is set up to copy the state of the instance to the remote server on the first dispatch
        instanceInfo.setIsDirty();
        // Schedule yourself for the first time
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        // Save a reference to the submitted taskscheduledPeriodicRef.set(next); }}Copy the code

After the commit, the main body of execution falls into the run method:

public void run(a) {
    try {
        // Call the refresh information method of the client instance
        discoveryClient.refreshInstanceInfo();

        // If the instance information changes, register it
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if(dirtyTimestamp ! =null) {
            // Call the API to register the client
            discoveryClient.register();
            // Set the status of the instance information to synchronizedinstanceInfo.unsetIsDirty(dirtyTimestamp); }}catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        // No matter what happens, the next schedule will start
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        // Save a reference to the new scheduled taskscheduledPeriodicRef.set(next); }}Copy the code

As you can see, what the replicator does is:

  1. Refresh local client instance information (including status);
  2. If necessary, passAPISynchronize local client instance information to remote server by registering.

In addition, in addition to the scheduling mechanism, the replicator also has the active replication mode of forced replication. The method is onDemandUpdate:

public boolean onDemandUpdate(a) {
    // Make sure the limiter allows this manual update before performing the update
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        // If the scheduler is not closed, proceed
        if(! scheduler.isShutdown()) {// Perform manual update tasks asynchronously
            scheduler.submit(new Runnable() {
                @Override
                public void run(a) {
                    logger.debug("Executing on-demand update of local InstanceInfo");
					// Get normal scheduled update tasks in progress (including normal scheduled update tasks and previously possible manual update triggered tasks)
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    // If the last task was executed just after the last line of the run method,
                    // There may be multiple tasks in the queue of the thread pool at the same time. (question)
                    if(latestPeriodic ! =null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        // If the previous task was not completed, cancel it
                        latestPeriodic.cancel(false);
                    }
					// Call the run method to update
                    InstanceInfoReplicator.this.run(); }});return true;
        } else {
            logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false; }}else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false; }}Copy the code

The logic here is that after passing the limit of the flow limiter, the previous task that is still running will be cancelled and then the update task will be executed.

According to the class description: A new update task is always scheduled after the completion of an earlier update task. However, if a manually executed task is started, the scheduled autoupdate task is discarded, and a new autoscheduled task is then scheduled at the end of the manually executed update.

In this mechanism, according to the comments in the previous code block, it is suspected that there may be multiple tasks in the thread pool queue at the same time, thus there may be a case that does not match the class description. This question will not be explored further for the time being, and I will fill in TODO later

There are two opportunities to force updates:

  1. When you sign up for a health checker;
  2. When an instance status update event is received

The frequency of instance state replication is determined by two parts:

  1. instanceInfoReplicationIntervalSecondsThe default value is 30 seconds.
  2. onDemandUpdateStatusChangeParameter iftrue, indicates that every time the instance state changes, the instance state change listener will be notified, and the listener will activate the manual update method of the replicator for the update. The default value istrue;

Current limiting mechanism

As mentioned in the previous section, replicators use flow limiters to control the frequency of manually executed update tasks.

Now let’s do some analysis of the current limiter.

Current limiter implementation class is com.net flix. Discovery. Util. RateLimiter.

First look at the class comment:

/** * Rate limiter implementation is based on token bucket algorithm. There are two parameters: * <ul> * <li> * burst size - maximum number of requests allowed into the system as a burst * </li> * <li> * average rate  - expected number of requests per second (RateLimiters using MINUTES is also supported) * </li> * </ul> * *@author Tomasz Bak
 */
Copy the code

Here’s what it says:

  1. The current limiter is implemented by token bucket algorithm.
  2. burst sizeIs the maximum number of requests that can be allowed in a given period of time (i.e., the bucket size of the token bucket);
  3. average rate, representing the expected number of requests per second (minutes are also supported, as specified in the constructor);

The important attributes in the class are:

/** * The conversion product needed to convert the time units passed in by the constructor into milliseconds, * mainly to unify [this n units of tokens externally calculated in time n] with [internal millisecond units] */
private final long rateToMsConversion;
/** * The number of tokens currently consumed */
private final AtomicInteger consumedTokens = new AtomicInteger();
/** * The last time the token was loaded into the bucket */
private final AtomicLong lastRefillTime = new AtomicLong(0);
Copy the code

The main externally invoked method of the class to obtain the token is the acquire method:

public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
    // If the bucket size and average flow limit do not meet the requirements, pass the bucket directly
    if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
        return true;
    }

    // Repopulate the token bucket if necessary
    refillToken(burstSize, averageRate, currentTimeMillis);
    // Consume a token
    return consumeToken(burstSize);
}
Copy the code

As you can see, the method is shown to populate the bucket and then consume the token.

Let’s start with the bucket filling method refillToken:

// Get the last time the bucket was filled
long refillTime = lastRefillTime.get();
// Calculate the interval between now and the last bucket filling time
long timeDelta = currentTimeMillis - refillTime;

// averageRate Indicates the rate at which traffic limiting is expected
TimeDelta/rateToMsConversion * averageRate // timeDelta/rateToMsConversion * averageRate // timeDelta/rateToMsConversion * averageRate
// Then multiply by the expected average rate of the specified unit to obtain the number of tokens that can be generated over the time of the current change
// But if I can write it in a way that I can easily understand, I might get a different result because I'm rounding down.
long newTokens = timeDelta * averageRate / rateToMsConversion;
// If the newly generated token is greater than 0, the code block is entered
if (newTokens > 0) {
    // If refillTime equals 0, set the new fill timestamp to the current time
    // If it is not the first time to fill, calculate the fill time to avoid the inaccuracy caused by the previous round down triggered by the calculation of the new token
    long newRefillTime = refillTime == 0
        ? currentTimeMillis
        : refillTime + newTokens * rateToMsConversion / averageRate;
    // Set a new fill time, if CAS is set successfully, proceed to the next step, otherwise abandon this expired (behind other threads) calculation
    // If this fails, another thread has already populated the token based on this point in time
    if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
        // all the way to
        while (true) {
            // This is thread safe, because every time CAS fails to set the number of tokens consumed,
            // All related data is recalculated to get the number of tokens that have been consumed
            // At the same time, because lastRefillTime has been successfully CAS set, other threads will not generate tokens for this point in time
            int currentLevel = consumedTokens.get();
            // Select a smaller number from the tokens currently consumed and the total bucket size (since the bucket size is passed in with each method call, it may get smaller)
            int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
            // Calculate the number of new consumed tokens as (the number of previously consumed tokens minus the number of newly generated tokens), with a minimum of 0
            int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
            // Exit the loop only after the CAS is successfully set
            if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
                return; }}}}Copy the code

To summarize, the refillToken method counts the number of tokens that should have been generated between the current time and the last calculated time and puts them in a bucket.

Next, look at the consumeToken method:

private boolean consumeToken(int burstSize) {
    while (true) {
        // Get the number of tokens currently consumed
        int currentLevel = consumedTokens.get();
        // If the current consumption is not less than the number of buckets, return false and the token acquisition failed
        if (currentLevel >= burstSize) {
            return false;
        }
        // After judging the above if, here is the scenario where you can try to consume. If the CAS setting succeeds, then the token is successfully obtained
        // If CAS fails, the loop continues until either if condition is true
        if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
            return true; }}}Copy the code

The way to consume is to try to add one to consumedTokens without exceeding the size of the bucket.

Service offline

Service is com.net offline methods flix. Discovery. DiscoveryClient# unregister, as follows:

void unregister(a) {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport ! =null&& eurekaTransport.registrationClient ! =null) {
        try {
            logger.info("Unregistering ...");
            // Call API to cancel registration
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e); }}}Copy the code

The logic in the method is simple, just call the server API to unregister.

Client Shutdown

The client closed method is com.net flix. Discovery. DiscoveryClient# shutdown, as follows:

public synchronized void shutdown(a) {
    // Synchronized synchronized synchronized synchronized synchronized synchronized synchronized synchronized synchronized synchronized synchronized synchronized synchronized
    if (isShutdown.compareAndSet(false.true)) {
        logger.info("Shutting down DiscoveryClient ...");
		// Unregister the status change listener
        if(statusChangeListener ! =null&& applicationInfoManager ! =null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }

        // Cancel previously scheduled tasks (including refresh registry, send heartbeat, instance information replicator)
        cancelScheduledTasks();

        // If APPINFO was registered
        if(applicationInfoManager ! =null
            && clientConfig.shouldRegisterWithEureka()
            && clientConfig.shouldUnregisterOnShutdown()) {
            // Set the state of the instance to DOWN
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            // Unregister the instance
            unregister();
        }

        // Close the communication client
        if(eurekaTransport ! =null) {
            eurekaTransport.shutdown();
        }

        // Close the monitor
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        Monitors.unregisterObject(this);

        logger.info("Completed shut down of DiscoveryClient"); }}Copy the code

The main job is to undo and close what the client initialization process does.

Other important classes

  • com.netflix.discovery.shared.Applications: Saves all slave fileseureka-serverGet the registry information;
  • com.netflix.discovery.util.ThresholdLevelsMetric: status monitor (related to fetching the registry);

The related configuration

Eureka abstracts the configuration as an interface, and callers can implement different ways of loading the configuration simply by changing different implementation classes.

This interface is com.net flix. Discovery. EurekaClientConfig, each method represents a configuration.

In spring-Cloud, the implementation class for this configuration interface is:

  • org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean:@ConfigurationProperties("eureka.instance")Annotation modifier, corresponding to configuration of instances;
  • org.springframework.cloud.netflix.eureka.EurekaClientConfigBean:@ConfigurationProperties("eureka.client")Annotations, corresponding to the configuration of the client;

Let’s start with the important instance-related configurations:

/** * actuator prefix */
private String actuatorPrefix = "/actuator";

/** * App name. The default value from the environment is spring.application.name */
private String appname = UNKNOWN;

/** * An insecure port for receiving traffic */
private int nonSecurePort = 80;

/** * Secure communication port */
private int securePort = 443;

/** * Whether to enable insecure ports to receive traffic */
private boolean nonSecurePortEnabled = true;

/** * Whether to enable secure ports */
private boolean securePortEnabled;

/** * Time interval for renewal */
private int leaseRenewalIntervalInSeconds = 30;

/** * How long does it take for the server to decide that the instance should be unavailable? * this value must be at least bigger than leaseRenewalIntervalInSeconds. * If this value is too large, it may direct traffic to a service that is not actually available; if it is too small, it may cause the instance to be removed due to transient network fluctuations. * /
private int leaseExpirationDurationInSeconds = 90;

/** * Virtual host name, default spring. Application. Name */
private String virtualHostName = UNKNOWN;

/** * A unique instance id in the application. In spring-cloud, This value is the default for * org.springframework.cloud.com mons. Util. IdUtils# getDefaultInstanceId (resolver) calculated * /
private String instanceId;


/** * instance key-value pair information, as metadata, other services can also access */
private Map<String, String> metadataMap = new HashMap<>();

/** * Data center information */
private DataCenterInfo dataCenterInfo = new MyDataCenterInfo(
    DataCenterInfo.Name.MyOwn);

/** * CLIENT IP address */
private String ipAddress;


/** * Health check path */
private String healthCheckUrlPath = actuatorPrefix + "/health";

/** * prefers to use IP instead of hostname */
private boolean preferIpAddress = false;

/** * Initial instance status information */
private InstanceStatus initialStatus = InstanceStatus.UP;
Copy the code

Looking at the important client-side attributes:

/** * Whether to enable the client */
private boolean enabled = true;

/** * Communication client configuration */
@NestedConfigurationProperty
private EurekaTransportConfig transport = new CloudEurekaTransportConfig();

/** * The interval between retrieving registry information */
private int registryFetchIntervalSeconds = 30;

/** * The interval between instance information being copied to the server */
private int instanceInfoReplicationIntervalSeconds = 30;

/** * When will the instance information be copied to the server initially */
private int initialInstanceInfoReplicationIntervalSeconds = 40;

/** * The interval between pulling server information */
private int eurekaServiceUrlPollIntervalSeconds = 5 * MINUTES;

/** * The timeout for reading information from the server */
private int eurekaServerReadTimeoutSeconds = 8;

/** * Connection service timeout */
private int eurekaServerConnectTimeoutSeconds = 5;

/** * An alternate degraded registry implementation in case the registry fails to be retrieved from the server */
private String backupRegistryImpl;

/** * Total number of connections from clients to all servers */
private int eurekaServerTotalConnections = 200;


/** * The maximum number of times that the next task scheduling delay can be increased after the heartbeat connection pool task fails */
private int heartbeatExecutorExponentialBackOffBound = 10;


/** * The maximum number of times that the delay of the next task can be increased after the task of refreshing the connection pool of the registry fails */
private int cacheRefreshExecutorExponentialBackOffBound = 10;

/** * A list of urls used to communicate with the server */
private Map<String, String> serviceUrl = new HashMap<>();


/** * Whether to enable gzip to compress network communication data */
private boolean gZipContent = true;

/** * Whether this instance should be registered with the server so that other components can discover and use */
private boolean registerWithEureka = true;


/** * Whether to disable incremental fetching of the registry. */ is disabled by default
private boolean disableDelta;


/** * Whether to fetch registry information from the server */
private boolean fetchRegistry = true;


/** * Whether to force the instance information to be copied immediately when the instance state changes */
private boolean onDemandUpdateStatusChange = true;


/** * Whether the client is explicitly offline from the server when closed */
private boolean shouldUnregisterOnShutdown = true;

/** * whether to force the client to register */ at initialization
private boolean shouldEnforceRegistrationAtInit = false;
Copy the code

conclusion

The entire work of Eureka-Client has been analyzed. Many of them have been very rewarding for me, such as time-out and delay mechanics and limiting traffic.

What is worth learning is:

  • CASExamples of usage;
  • Task scheduling mode;
  • Realization of current limiting mechanism;
  • Scalability considerations;
  • An abstract way to configure;

The resources

  • SpringCloud source series (1) – registry initialization for Eureka
  • Common traffic limiting solutions
  • Eureka’s token bucket algorithm RateLimiter
  • InstanceInfo class in Eureka
  • InstanceInfo Indicates the instance information