An overview of the

This paper mainly analyzes the implementation principle and business details of Eureka from the perspective of Eureka source code. At the beginning of this paper, it also gives the configuration of the cluster mode server and the configuration of the client demo.

Eureka’s history

Sprng Cloud has stopped updating instructions for some Netflix projects

Eureka1. x is no longer updated, but eureka1.x is in maintenance.

Spring. IO/blog / 2019/0…

Spring Cloud Hoxton release notes

The Hoxton release features responsive programming support.

Spring. IO/blog / 2019/1…

Cluster Mode Configuration

Version Overview:

Spring - the boot 2.4.2

Spring - cloud 2020.0.1

Server Configuration

  1. pom.xml

    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
      </dependency>
    </dependencies>  
    
    <dependencyManagement>
      <dependencies>
        <! - spring boot 2.4.2 -- -- >
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-dependencies</artifactId>
          <version>${spring-boot.version}</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
        <! - spring cloud 2020.0.1 -- -- >
        <dependency>
          <groupId>org.springframework.cloud</groupId>
          <artifactId>spring-cloud-dependencies</artifactId>
          <version>${spring-cloud.version}</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>   
    Copy the code
  2. application.yml

    server:
      port: 3001
    eureka:
      server:
        enable-self-preservation: false # Turn off self-protection mechanisms
        eviction-interval-timer-in-ms: 4000 # Set interval (milliseconds)
      instance:
        hostname: eureka3000
      client:
        register-with-eureka: false # Do not register yourself as a client
        fetch-registry: false There is no need to obtain registration information from the server
        service-url:
          default-zone: http://eureka3001.com:3001/eureka,http://eureka3001.com:3002/eureka,http://eureka3001.com:3003/eureka
    Copy the code
  3. Start the class

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
    
    @SpringBootApplication
    @EnableEurekaServer
    public class EurekaServerApplication {
    
        public static void main(String[] args) { SpringApplication.run(EurekaServerApplication.class); }}Copy the code

Client Configuration

  1. pom.xml

    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
      </dependency>
    
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
    </dependencies>
    Copy the code
  2. yml

    server:
      port: 6000
    
    eureka:
      client:
        serviceUrl:
          If you fill in a dash, the configuration will be invalid
          defaultZone: http://127.0.0.1:3000/eureka # eureka server provides the registration address
      instance:
        instance-id: power-1 The unique instance ID of this instance registered with the Eureka server
        prefer-ip-address: true Whether to display the IP address
        leaseRenewalIntervalInSeconds: 10 # How long does it take for a eureka customer to send a heartbeat to the Eureka server to indicate that it is still alive? Default is 30 seconds (in the same units as configured below)
        leaseExpirationDurationInSeconds: 30 How long should the Eureka server wait after receiving the last heartbeat of the instance before deleting the instance? Default is 90 seconds
    
    spring:
      application:
        name: servcie-client
    Copy the code
  3. Start the class

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class EurekaClientApplication {
    
        public static void main(String[] args) { SpringApplication.run(EurekaClientApplication.class); }}Copy the code

Spring Boot Custom starter

Eureka Server source code

Instead of using Spring MVC as the Web communication framework, Eureka uses Jersey as the underlying framework. The difference between Eureka and Spring MVC is that it uses Filter as the processing request distribution.

I. Service registration

Register information store, new ConcurrentHashMap

>>(); The first string is the service name, the inner string is the instance ID, Lease is the lessee, and InstanceInfo is the service information
,>

// Service registration code
// AbstractInstanceRegistry
// Registrant Specifies the registration information sent by this registration request
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        read.lock();
        try {
            // registry is the address where all registration information is stored
            // gMap gets the microserver group by the service name
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) { gMap = gNewMap; }}// An existing microservice instance object
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if(existingLease ! =null&& (existingLease.getHolder() ! =null)) {
                // The last operation timestamp of the currently existing microservice instance object
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                // The passed timestamp of the registered instance
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // Use a new timestamp when the timestamp is earlier
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    // getHolder() specifies the microservice instance objectregistrant = existingLease.getHolder(); }}else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        // If a conflict occurs, the self-protection threshold is updated
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // Lease Lease object
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if(existingLease ! =null) {
                // Update the last normal working hours
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // Service registration
            gMap.put(registrant.getId(), lease);
            
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            if(! InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if(! overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if(overriddenStatusFromMap ! =null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally{ read.unlock(); }}Copy the code

Service off the shelf

Start the timer to determine whether the service has expired and whether the current time is greater than the expiration time lastUpdateTimestamp.

Formula: Current system time > lastUpdateTimestamp + 30s (Service expiration time)

  1. Service to eliminate

    // Go through all the service information once every 15 minutes, more than 15% should be removed
    public void evict(long additionalLeaseMs) {
      logger.debug("Running the evict task");
    
      if(! isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled.");
        return;
      }
    
      // We collect first all expired items, to evict them in random order. For large eviction sets,
      // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
      // the impact should be evenly distributed across all applications.
      // Iterate through all the service information and check whether it is added to the expired list
      List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
      for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if(leaseMap ! =null) {
          for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
            Lease<InstanceInfo> lease = leaseEntry.getValue();
            // Determine whether the service has expired
            if(lease.isExpired(additionalLeaseMs) && lease.getHolder() ! =null) { expiredLeases.add(lease); }}}}// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
      // triggering self-preservation. Without that we would wipe out full registry.
      // If the number of culls is too large, cull some of them first
      int registrySize = (int) getLocalRegistrySize();
      / / serverConfig. GetRenewalPercentThreshold () 85% by default
      int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
      int evictionLimit = registrySize - registrySizeThreshold;
    
      int toEvict = Math.min(expiredLeases.size(), evictionLimit);
      if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
        // Random cull, similar to shuffle algorithm
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
          // Pick a random item (Knuth shuffle algorithm)
          int next = i + random.nextInt(expiredLeases.size() - i);
          Collections.swap(expiredLeases, i, next);
          Lease<InstanceInfo> lease = expiredLeases.get(i);
    
          String appName = lease.getHolder().getAppName();
          String id = lease.getHolder().getId();
          EXPIRED.increment();
          logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
          internalCancel(appName, id, false); }}}Copy the code

Three. Heartbeat connection

Change the last operation time

// InstanceResource
@PUT
public Response renewLease(
  @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
  @QueryParam("overriddenstatus") String overriddenStatus,
  @QueryParam("status") String status,
  @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
  boolean isFromReplicaNode = "true".equals(isReplication);
  boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

  // Not found in the registry, immediately ask for a register
  if(! isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
    return Response.status(Status.NOT_FOUND).build();
  }
  // Check if we need to sync based on dirty time stamp, the client
  // instance might have changed some value
  Response response;
  if(lastDirtyTimestamp ! =null && serverConfig.shouldSyncWhenTimestampDiffers()) {
    response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
    // Store the overridden status since the validation found out the node that replicates wins
    if(response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus ! =null) &&! (InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); }}else {
    response = Response.ok().build();
  }
  logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
  return response;
}
Copy the code

4. Cluster principle

  1. To be responsible for handling by PeerAwareInstanceRegistryImpl cluster synchronization.

  2. If a client node registers with a server node, the receiving server node forwards requests from the client node to other server nodes

    // Cluster synchronization
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */.boolean isReplication) {
      Stopwatch tracer = action.getTimer().start();
      try {
        if (isReplication) {
          numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        // All cluster nodes (peerEurekaNodes) are empty or one cluster operation (isReplication) to prevent endless registration
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
          return;
        }
    		
        // Synchronize to all nodes
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
          // If the url represents this host, do not replicate to yourself.
          if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { // Exclude the current node
            continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); }}finally{ tracer.stop(); }}Copy the code
  3. Event handling replicateInstanceActionsToPeers cluster synchronization

    // Cluster synchronization event processing
    private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
      try {
        InstanceInfo infoFromRegistry;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
          case Cancel:    / / cancel
            node.cancel(appName, id);
            break;
          case Heartbeat: / / the heart
            InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
            infoFromRegistry = getInstanceByAppAndId(appName, id, false);
            node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
            break;
          case Register:  / / register
            node.register(info);
            break;
          case StatusUpdate:
            infoFromRegistry = getInstanceByAppAndId(appName, id, false);
            node.statusUpdate(appName, id, newStatus, infoFromRegistry);
            break;
          case DeleteStatusOverride:
            infoFromRegistry = getInstanceByAppAndId(appName, id, false);
            node.deleteStatusOverride(appName, id, infoFromRegistry);
            break; }}catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
      } finally{ CurrentRequestVersion.remove(); }}Copy the code
  4. Starting cluster synchronization

    public int syncUp(a) {
      // Copy entire entry from neighboring DS node
      int count = 0;
    
      for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
          try {
            Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
          } catch (InterruptedException e) {
            logger.warn("Interrupted during registry transfer..");
            break;
          }
        }
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
          for (InstanceInfo instance : app.getInstances()) {
            try {
              if (isRegisterable(instance)) {
                register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; }}catch (Throwable t) {
              logger.error("During DS init copy", t); }}}}return count;
    }
    Copy the code

V. Self-protection mechanism

  1. Whether to enable the self-protection mechanism
/ / isSelfPreservationModeEnabled configuration parameters
public boolean isSelfPreservationModeEnabled(a) {
  return serverConfig.shouldEnableSelfPreservation();
}
Copy the code
  1. Trigger self-protection mechanism configuration, trigger conditions, a short period of time, a large number of heartbeat connections expire. Just a lot of downtime. Eureka triggers a self-protection mechanism. A large number of thresholds are 85%.
/ / trigger numberOfRenewsPerMinThreshold self protection mechanism configuration threshold
public boolean isLeaseExpirationEnabled(a) {
  if(! isSelfPreservationModeEnabled()) {// The self preservation mode is disabled, hence allowing the instances to expire.
    return true;
  }
  return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
Copy the code
  1. A self-protection mechanism for the execution of service registration
// The lease does not exist and hence it is a new registration
synchronized (lock) {
  if (this.expectedNumberOfClientsSendingRenews > 0) {
    // Since the client wants to register it, increase the number of clients sending renews
    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); }}Copy the code

Change the threshold updateRenewsPerMinThreshold self-protection

/ / getExpectedClientRenewalIntervalSeconds default sent every minute heartbeat word twice by default
// Calculation formula: Estimated heartbeat value (all registered instances) * Number of heartbeat connections triggered per minute (60s/server refresh time per minute default 30s) * Percentage triggered by self-protection mechanism (default 85%)
// 15% are not connected, triggering self-protection mechanism
protected void updateRenewsPerMinThreshold(a) {
  this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                                              * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                                              * serverConfig.getRenewalPercentThreshold());
}
Copy the code

Timed changes to the self-protection mechanism

private void scheduleRenewalThresholdUpdateTask(a) {
  timer.schedule(new TimerTask() {
    @Override
    public void run(a) {
      updateRenewalThreshold();
    }
  }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
                 serverConfig.getRenewalThresholdUpdateIntervalMs());
}
Copy the code

Conclusion: For the self-protection mechanism to work properly, the heartbeat counts per minute on the client must be the same as those on the server

6. Caching mechanism

Eureka layer 3 cache

  1. Read-only cache:ConcurrentHashMap
  2. Read and write cache:guava
  3. Real data:ConcurrentHashMap

Purpose: If you manipulate real data directly, you can reduce lock preemption and increase efficiency. The problem is that the data is inconsistent, and it’s not 95% consistent at the time

  1. Service discovery, the acquisition of service information
@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {

  boolean isRemoteRegionRequested = null! = regionsStr && ! regionsStr.isEmpty(); String[] regions =null;
  if(! isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); }else {
    regions = regionsStr.toLowerCase().split(",");
    Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
    EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
  }

  // Check if the server allows the access to the registry. The server can
  // restrict access if it is not
  // ready to serve traffic depending on various reasons.
  if(! registry.shouldAllowAccess(isRemoteRegionRequested)) {return Response.status(Status.FORBIDDEN).build();
  }
  CurrentRequestVersion.set(Version.toEnum(version));
  KeyType keyType = Key.KeyType.JSON;
  String returnMediaType = MediaType.APPLICATION_JSON;
  if (acceptHeader == null| |! acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey =new Key(Key.EntityType.Application,
                         ResponseCacheImpl.ALL_APPS,
                         keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
                        );

  Response response;
  if(acceptEncoding ! =null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    // responseCache cache object
    response = Response.ok(responseCache.getGZIP(cacheKey))
      .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
      .header(HEADER_CONTENT_TYPE, returnMediaType)
      .build();
  } else {
    response = Response.ok(responseCache.get(cacheKey))
      .build();
  }
  CurrentRequestVersion.remove();
  return response;
}
Copy the code

The final query calls the getValue method

// ResponseCacheImpl
Value getValue(final Key key, boolean useReadOnlyCache) {
  Value payload = null;
  try {
    if (useReadOnlyCache) {
      // 1. Read-only cache
      final Value currentPayload = readOnlyCacheMap.get(key);
      if(currentPayload ! =null) {
        payload = currentPayload;
      } else {
        // 2. Read and write guavapayload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); }}else{ payload = readWriteCacheMap.get(key); }}catch (Throwable t) {
    logger.error("Cannot get value for key : {}", key, t);
  }
  return payload;
}
Copy the code

Querying real data

private Value generatePayload(Key key) {
  Stopwatch tracer = null;
  try {
    String payload;
    switch (key.getEntityType()) {
      case Application:
        boolean isRemoteRegionRequested = key.hasRegions();

        if (ALL_APPS.equals(key.getName())) {
          if (isRemoteRegionRequested) {
            tracer = serializeAllAppsWithRemoteRegionTimer.start();
            payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
          } else{ tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); }}else if (ALL_APPS_DELTA.equals(key.getName())) {
          if (isRemoteRegionRequested) {
            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
            versionDeltaWithRegions.incrementAndGet();
            versionDeltaWithRegionsLegacy.incrementAndGet();
            payload = getPayLoad(key,
                                 registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
          } else{ tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); }}else {
          tracer = serializeOneApptimer.start();
          payload = getPayLoad(key, registry.getApplication(key.getName()));
        }
        break;
      case VIP:
      case SVIP:
        tracer = serializeViptimer.start();
        payload = getPayLoad(key, getApplicationsForVip(key, registry));
        break;
      default:
        logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
        payload = "";
        break;
    }
    return new Value(payload);
  } finally {
    if(tracer ! =null) { tracer.stop(); }}}Copy the code

Summary of cache reads:

  1. First enter the read-only cache;
  2. If the read-only cache doesn’t have one, it goes into the read/write cache;
  3. If the read/write cache is not available, then the listener logic is executed to fetch from the real data.

When does the cache change?

  1. The location where the read-only cache is modified. The read-only cache can only be synchronized every 30 seconds through a scheduled task.
  2. If the read-only cache cannot be found, but the read/write cache can be queried, it is updated to the read-only cache.
  3. Delay statistics: 30 seconds timer delay + client cache delay 30 seconds + ribbon (1s) = 61 seconds
// ResponseCacheImpl

Value getValue(final Key key, boolean useReadOnlyCache) {
  Value payload = null;
  try {
    if (useReadOnlyCache) {
      final Value currentPayload = readOnlyCacheMap.get(key);
      if(currentPayload ! =null) {
        payload = currentPayload;
      } else {
        // If the read-only cache cannot be found, but the read/write cache can be queried, it will be updated.payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); }}else{ payload = readWriteCacheMap.get(key); }}catch (Throwable t) {
    logger.error("Cannot get value for key : {}", key, t);
  }
  return payload;
}

// Read-only cache update, perform a data synchronization 30 seconds
private TimerTask getCacheUpdateTask(a) {
  return new TimerTask() {
    @Override
    public void run(a) {
      logger.debug("Updating the client cache from response cache");
      for (Key key : readOnlyCacheMap.keySet()) {
        if (logger.isDebugEnabled()) {
          logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                       key.getEntityType(), key.getName(), key.getVersion(), key.getType());
        }
        try {
          CurrentRequestVersion.set(key.getVersion());
          Value cacheValue = readWriteCacheMap.get(key);
          Value currentCacheValue = readOnlyCacheMap.get(key);
          if (cacheValue != currentCacheValue) {
            readOnlyCacheMap.put(key, cacheValue);
          }
        } catch (Throwable th) {
          logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
        } finally{ CurrentRequestVersion.remove(); }}}}; }Copy the code

When the service is discovered, it will lock the registration (modify, remove) operation. When the service is discovered, it will lock the write lock. When the registration (modify, remove) operation, it will lock the read lock.

This is designed to be as accurate as possible.

Eureka Client source code

The Eureka Client service updates the interface information

  1. Service initialization registration
  2. The service sends a heartbeat message
  3. Service list pull, full pull
  4. Service list pull, incremental pull

I. Initialization process

// DiscoveryClient
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
  if(args ! =null) {
    this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
    this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
    this.eventListeners.addAll(args.getEventListeners());
    this.preRegistrationHandler = args.preRegistrationHandler;
  } else {
    this.healthCheckCallbackProvider = null;
    this.healthCheckHandlerProvider = null;
    this.preRegistrationHandler = null;
  }

  this.applicationInfoManager = applicationInfoManager;
  InstanceInfo myInfo = applicationInfoManager.getInfo();

  clientConfig = config;
  staticClientConfig = clientConfig;
  transportConfig = config.getTransportConfig();
  instanceInfo = myInfo;
  if(myInfo ! =null) {
    appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
  } else {
    logger.warn("Setting instanceInfo to a passed in null value");
  }

  this.backupRegistryProvider = backupRegistryProvider;
  this.endpointRandomizer = endpointRandomizer;
  this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
  localRegionApps.set(new Applications());

  fetchRegistryGeneration = new AtomicLong(0);

  remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
  remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

  if (config.shouldFetchRegistry()) {
    this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_".new long[] {15L.30L.60L.120L.240L.480L});
  } else {
    this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
  }

  if (config.shouldRegisterWithEureka()) {
    this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_".new long[] {15L.30L.60L.120L.240L.480L});
  } else {
    this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
  }

  logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

  if(! config.shouldRegisterWithEureka() && ! config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data.");
    scheduler = null;
    heartbeatExecutor = null;
    cacheRefreshExecutor = null;
    eurekaTransport = null;
    instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    initRegistrySize = this.getApplications().size();
    registrySize = initRegistrySize;
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, initRegistrySize);

    return;  // no need to setup up an network tasks and we are done
  }

  try {
    // default size of 2 - 1 each for heartbeat and cacheRefresh
    // Scheduling class for scheduled tasks
    scheduler = Executors.newScheduledThreadPool(2.new ThreadFactoryBuilder()
                                                 .setNameFormat("DiscoveryClient-%d")
                                                 .setDaemon(true)
                                                 .build());

    // Heartbeat thread pool
    heartbeatExecutor = new ThreadPoolExecutor(
      1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(),
      new ThreadFactoryBuilder()
      .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
      .setDaemon(true)
      .build()
    );  // use direct handoff

    // Configure the refreshed thread pool
    cacheRefreshExecutor = new ThreadPoolExecutor(
      1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(),
      new ThreadFactoryBuilder()
      .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
      .setDaemon(true)
      .build()
    );  // use direct handoff

    eurekaTransport = new EurekaTransport();
    scheduleServerEndpointTask(eurekaTransport, args);

    AzToRegionMapper azToRegionMapper;
    if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
      azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
    } else {
      azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
    }
    if (null! = remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
    }
    instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
  } catch (Throwable e) {
    throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
  }
  
  ShouldFetchRegistry turns off service discovery, does not pull other service lists, and simply acts as a service provider
  if (clientConfig.shouldFetchRegistry()) {
    try {
      // Service discovery
      // 1. Go to Eureka first to get information
      // 2. Then register with Eureka
      boolean primaryFetchRegistryResult = fetchRegistry(false);
      if(! primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed");
      }
      boolean backupFetchRegistryResult = true;
      // The server can't get the registration information, so it goes to the secondary server to get the registration information
      if(! primaryFetchRegistryResult && ! fetchRegistryFromBackup()) { backupFetchRegistryResult =false;
        logger.info("Initial registry fetch from backup servers failed");
      }
      if(! primaryFetchRegistryResult && ! backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); }}catch (Throwable th) {
      logger.error("Fetch registry error at startup: {}", th.getMessage());
      throw newIllegalStateException(th); }}// call and execute the pre registration handler before all background tasks (inc registration) is started
  if (this.preRegistrationHandler ! =null) {
    this.preRegistrationHandler.beforeRegistration();
  }
  
  // shouldRegisterWithEureka as a consumer, just a consumer, not to do service registration
  if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
    try {
      // Service registration
      if(! register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response."); }}catch (Throwable th) {
      logger.error("Registration error at startup: {}", th.getMessage());
      throw newIllegalStateException(th); }}// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
  initScheduledTasks();

  try {
    Monitors.registerObject(this);
  } catch (Throwable e) {
    logger.warn("Cannot register timers", e);
  }

  // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
  // to work with DI'd DiscoveryClient
  DiscoveryManager.getInstance().setDiscoveryClient(this);
  DiscoveryManager.getInstance().setEurekaClientConfig(config);

  initTimestampMs = System.currentTimeMillis();
  initRegistrySize = this.getApplications().size();
  registrySize = initRegistrySize;
  logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
              initTimestampMs, initRegistrySize);
}
Copy the code

The service registry

// Service registration
// DiscoveryClient
boolean register(a) throws Throwable {
  logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
  EurekaHttpResponse<Void> httpResponse;
  try {
    httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
  } catch (Exception e) {
    logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
    throw e;
  }
  if (logger.isInfoEnabled()) {
    logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
  }
  return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

// AbstractJerseyEurekaHttpClient
// Is essentially a Jersey call to the Eureka Server registration interface
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
  String urlPath = "apps/" + info.getAppName();
  ClientResponse response = null;
  try {
    Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
    addExtraHeaders(resourceBuilder);
    response = resourceBuilder
      .header("Accept-Encoding"."gzip")
      .type(MediaType.APPLICATION_JSON_TYPE)
      .accept(MediaType.APPLICATION_JSON)
      .post(ClientResponse.class, info); // info Client data
    return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
  } finally {
    if (logger.isDebugEnabled()) {
      logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                   response == null ? "N/A" : response.getStatus());
    }
    if(response ! =null) { response.close(); }}}// The server is handled in ApplicationResource#addInstance
Copy the code

2. Heartbeat connection

// DiscoveryClient
boolean renew(a) {
  EurekaHttpResponse<InstanceInfo> httpResponse;
  try {
    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
    logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
    // Check if NOT_FOUND 404 is returned
    if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
      REREGISTER_COUNTER.increment();
      logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
      long timestamp = instanceInfo.setIsDirtyWithTime();
      boolean success = register();
      if (success) {
        instanceInfo.unsetIsDirty(timestamp);
      }
      return success;
    }
    return httpResponse.getStatusCode() == Status.OK.getStatusCode();
  } catch (Throwable e) {
    logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
    return false; }}Copy the code

3. Service discovery

  1. Whether to pull eureka in full, pull all registration information.
  2. Whether to incrementally pull eureka data in the last three minutes; Is there any updated information? If so
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
  Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

  try {
    // If the delta is disabled or if it is the first time, get all
    // applications
    // Get client cache information
    Applications applications = getApplications();

    if (clientConfig.shouldDisableDelta()  // The configuration only pulls the full volume| | (! Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))// VIP address, whether eureka client is currently interested in a single registered address
        || forceFullRegistryFetch // Force full pull
        || (applications == null) // When initialized, it is full
        || (applications.getRegisteredApplications().size() == 0)
        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
    {
      logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
      logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
      logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
      logger.info("Application is null : {}", (applications == null));
      logger.info("Registered Applications size is zero : {}",
                  (applications.getRegisteredApplications().size() == 0));
      logger.info("Application version is -1: {}", (applications.getVersion() == -1));
      getAndStoreFullRegistry();       // full pull
    } else {
      getAndUpdateDelta(applications); // Incremental pull
    }
    applications.setAppsHashCode(applications.getReconcileHashCode());
    logTotalInstances();
  } catch (Throwable e) {
    logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
                appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
    return false;
  } finally {
    if(tracer ! =null) { tracer.stop(); }}// Notify about cache refresh before updating the instance remote status
  onCacheRefreshed();

  // Update remote status based on refreshed data held in the cache
  updateInstanceRemoteStatus();

  // registry was fetched successfully, so return true
  return true;
}
Copy the code

Delta service pull, prevent concurrency there is a lock operation.

// Service incremental pull
private void getAndUpdateDelta(Applications applications) throws Throwable {
  long currentUpdateGeneration = fetchRegistryGeneration.get();

  Applications delta = null;
  EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
  if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
    delta = httpResponse.getEntity();
  }

  if (delta == null) {
    logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
    getAndStoreFullRegistry();
  } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
    logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
    String reconcileHashCode = "";
    if (fetchRegistryUpdateLock.tryLock()) {
      try {
        updateDelta(delta);
        reconcileHashCode = getReconcileHashCode(applications);
      } finally{ fetchRegistryUpdateLock.unlock(); }}else {
      logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
    }
    // There is a diff in number of instances for some reason
    if(! reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode);// this makes a remoteCall}}else {
    logger.warn("Not updating application delta as another thread is updating it already");
    logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); }}Copy the code

RecentlyChangedQueue Clears the data that has been modified within 3 minutes

Every 30 seconds, a scheduled task is performed to clear all microservice instances that have not been updated within 3 minutes

Let’s deltas hashCode with full data

Client: local number hashCode + incremental hashCode

Is the same as the hashCode passed by the server

Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
Copy the code

Four. Service off the shelf

  1. Manually remove the rack and injectDiscoveryClientThen call the shutdown method
// DiscoveryClient
public synchronized void shutdown(a) {
  if (isShutdown.compareAndSet(false.true)) {
    logger.info("Shutting down DiscoveryClient ...");

    if(statusChangeListener ! =null&& applicationInfoManager ! =null) {
      applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
    }

    cancelScheduledTasks();

    // If APPINFO was registered
    if(applicationInfoManager ! =null
        && clientConfig.shouldRegisterWithEureka()
        && clientConfig.shouldUnregisterOnShutdown()) {
      applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
      unregister();
    }

    if(eurekaTransport ! =null) {
      eurekaTransport.shutdown();
    }

    heartbeatStalenessMonitor.shutdown();
    registryStalenessMonitor.shutdown();

    Monitors.unregisterObject(this);

    logger.info("Completed shut down of DiscoveryClient"); }}Copy the code

Eureka summary

Eureka is an excellent service registry that implements AP primarily to ensure availability and fault tolerance of partitions.

  1. All Eureka Server node configuration information is stored in memory, query service service registration information using a multi-layer cache.
    • Multi-level caching: Level 1 caching is performed first**readOnlyCacheMap**Read, and then read the secondary cache**readWriteCacheMap**And finally read the real data
    • The cache is expired, and the second-level cache will be invalid after receiving the register and Renew Cancel request; Service culling deletes the secondary cache; The level 2 cache itself has expired.
    • Cache update: If the level-1 cache cannot be queried, the cache will be queried from level-2 cache. If the level-2 cache does not exist, the cache will be synchronized to Level-1 cache. Level-1 cache is automatically synchronized to scheduled tasks once every three minutes.
  2. In a Eureka Server cluster environment, a registered service initiates registration with only one service, and the current service node traverses other nodes for registration information synchronization.
  3. By default, the Eureka Client sends a heartbeat every 30 seconds to inform the Eureka Server that the Client is alive and well. If the Eureka Server does not receive a renewal within 90 seconds, it removes the instance from the registry.
  4. Eureka Server self-protection mechanism. When a large number of services expire and the number of surviving services is less than 85%, the self-protection mechanism will be activated and only 15% of the services will be offline each time.