An overview of the
This paper mainly analyzes the implementation principle and business details of Eureka from the perspective of Eureka source code. At the beginning of this paper, it also gives the configuration of the cluster mode server and the configuration of the client demo.
Eureka’s history
Sprng Cloud has stopped updating instructions for some Netflix projects
Eureka1. x is no longer updated, but eureka1.x is in maintenance.
Spring. IO/blog / 2019/0…
Spring Cloud Hoxton release notes
The Hoxton release features responsive programming support.
Spring. IO/blog / 2019/1…
Cluster Mode Configuration
Version Overview:
Spring - the boot 2.4.2
Spring - cloud 2020.0.1
Server Configuration
-
pom.xml
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <! - spring boot 2.4.2 -- -- > <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <! - spring cloud 2020.0.1 -- -- > <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> Copy the code
-
application.yml
server: port: 3001 eureka: server: enable-self-preservation: false # Turn off self-protection mechanisms eviction-interval-timer-in-ms: 4000 # Set interval (milliseconds) instance: hostname: eureka3000 client: register-with-eureka: false # Do not register yourself as a client fetch-registry: false There is no need to obtain registration information from the server service-url: default-zone: http://eureka3001.com:3001/eureka,http://eureka3001.com:3002/eureka,http://eureka3001.com:3003/eureka Copy the code
-
Start the class
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @SpringBootApplication @EnableEurekaServer public class EurekaServerApplication { public static void main(String[] args) { SpringApplication.run(EurekaServerApplication.class); }}Copy the code
Client Configuration
-
pom.xml
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> Copy the code
-
yml
server: port: 6000 eureka: client: serviceUrl: If you fill in a dash, the configuration will be invalid defaultZone: http://127.0.0.1:3000/eureka # eureka server provides the registration address instance: instance-id: power-1 The unique instance ID of this instance registered with the Eureka server prefer-ip-address: true Whether to display the IP address leaseRenewalIntervalInSeconds: 10 # How long does it take for a eureka customer to send a heartbeat to the Eureka server to indicate that it is still alive? Default is 30 seconds (in the same units as configured below) leaseExpirationDurationInSeconds: 30 How long should the Eureka server wait after receiving the last heartbeat of the instance before deleting the instance? Default is 90 seconds spring: application: name: servcie-client Copy the code
-
Start the class
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class EurekaClientApplication { public static void main(String[] args) { SpringApplication.run(EurekaClientApplication.class); }}Copy the code
Spring Boot Custom starter
Eureka Server source code
Instead of using Spring MVC as the Web communication framework, Eureka uses Jersey as the underlying framework. The difference between Eureka and Spring MVC is that it uses Filter as the processing request distribution.
I. Service registration
Register information store, new ConcurrentHashMap
>>(); The first string is the service name, the inner string is the instance ID, Lease is the lessee, and InstanceInfo is the service information
// Service registration code
// AbstractInstanceRegistry
// Registrant Specifies the registration information sent by this registration request
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock();
try {
// registry is the address where all registration information is stored
// gMap gets the microserver group by the service name
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) { gMap = gNewMap; }}// An existing microservice instance object
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if(existingLease ! =null&& (existingLease.getHolder() ! =null)) {
// The last operation timestamp of the currently existing microservice instance object
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
// The passed timestamp of the registered instance
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
// Use a new timestamp when the timestamp is earlier
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
// getHolder() specifies the microservice instance objectregistrant = existingLease.getHolder(); }}else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
// If a conflict occurs, the self-protection threshold is updated
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// Lease Lease object
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if(existingLease ! =null) {
// Update the last normal working hours
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// Service registration
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
if(! InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if(! overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if(overriddenStatusFromMap ! =null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally{ read.unlock(); }}Copy the code
Service off the shelf
Start the timer to determine whether the service has expired and whether the current time is greater than the expiration time lastUpdateTimestamp.
Formula: Current system time > lastUpdateTimestamp + 30s (Service expiration time)
-
Service to eliminate
// Go through all the service information once every 15 minutes, more than 15% should be removed public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if(! isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. // Iterate through all the service information and check whether it is added to the expired list List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if(leaseMap ! =null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); // Determine whether the service has expired if(lease.isExpired(additionalLeaseMs) && lease.getHolder() ! =null) { expiredLeases.add(lease); }}}}// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. // If the number of culls is too large, cull some of them first int registrySize = (int) getLocalRegistrySize(); / / serverConfig. GetRenewalPercentThreshold () 85% by default int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); // Random cull, similar to shuffle algorithm Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); internalCancel(appName, id, false); }}}Copy the code
Three. Heartbeat connection
Change the last operation time
// InstanceResource
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if(! isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response;
if(lastDirtyTimestamp ! =null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if(response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus ! =null) &&! (InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); }}else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
Copy the code
4. Cluster principle
-
To be responsible for handling by PeerAwareInstanceRegistryImpl cluster synchronization.
-
If a client node registers with a server node, the receiving server node forwards requests from the client node to other server nodes
// Cluster synchronization private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */.boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication // All cluster nodes (peerEurekaNodes) are empty or one cluster operation (isReplication) to prevent endless registration if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // Synchronize to all nodes for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { // Exclude the current node continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); }}finally{ tracer.stop(); }}Copy the code
-
Event handling replicateInstanceActionsToPeers cluster synchronization
// Cluster synchronization event processing private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: / / cancel node.cancel(appName, id); break; case Heartbeat: / / the heart InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: / / register node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; }}catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } finally{ CurrentRequestVersion.remove(); }}Copy the code
-
Starting cluster synchronization
public int syncUp(a) { // Copy entire entry from neighboring DS node int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; }}catch (Throwable t) { logger.error("During DS init copy", t); }}}}return count; } Copy the code
V. Self-protection mechanism
- Whether to enable the self-protection mechanism
/ / isSelfPreservationModeEnabled configuration parameters
public boolean isSelfPreservationModeEnabled(a) {
return serverConfig.shouldEnableSelfPreservation();
}
Copy the code
- Trigger self-protection mechanism configuration, trigger conditions, a short period of time, a large number of heartbeat connections expire. Just a lot of downtime. Eureka triggers a self-protection mechanism. A large number of thresholds are 85%.
/ / trigger numberOfRenewsPerMinThreshold self protection mechanism configuration threshold
public boolean isLeaseExpirationEnabled(a) {
if(! isSelfPreservationModeEnabled()) {// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
Copy the code
- A self-protection mechanism for the execution of service registration
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); }}Copy the code
Change the threshold updateRenewsPerMinThreshold self-protection
/ / getExpectedClientRenewalIntervalSeconds default sent every minute heartbeat word twice by default
// Calculation formula: Estimated heartbeat value (all registered instances) * Number of heartbeat connections triggered per minute (60s/server refresh time per minute default 30s) * Percentage triggered by self-protection mechanism (default 85%)
// 15% are not connected, triggering self-protection mechanism
protected void updateRenewsPerMinThreshold(a) {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
Copy the code
Timed changes to the self-protection mechanism
private void scheduleRenewalThresholdUpdateTask(a) {
timer.schedule(new TimerTask() {
@Override
public void run(a) {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
Copy the code
Conclusion: For the self-protection mechanism to work properly, the heartbeat counts per minute on the client must be the same as those on the server
6. Caching mechanism
Eureka layer 3 cache
- Read-only cache:
ConcurrentHashMap
- Read and write cache:
guava
- Real data:
ConcurrentHashMap
Purpose: If you manipulate real data directly, you can reduce lock preemption and increase efficiency. The problem is that the data is inconsistent, and it’s not 95% consistent at the time
- Service discovery, the acquisition of service information
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null! = regionsStr && ! regionsStr.isEmpty(); String[] regions =null;
if(! isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); }else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
if(! registry.shouldAllowAccess(isRemoteRegionRequested)) {return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null| |! acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey =new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if(acceptEncoding ! =null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
// responseCache cache object
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}
Copy the code
The final query calls the getValue method
// ResponseCacheImpl
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
// 1. Read-only cache
final Value currentPayload = readOnlyCacheMap.get(key);
if(currentPayload ! =null) {
payload = currentPayload;
} else {
// 2. Read and write guavapayload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); }}else{ payload = readWriteCacheMap.get(key); }}catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
Copy the code
Querying real data
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else{ tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); }}else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else{ tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); }}else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if(tracer ! =null) { tracer.stop(); }}}Copy the code
Summary of cache reads:
- First enter the read-only cache;
- If the read-only cache doesn’t have one, it goes into the read/write cache;
- If the read/write cache is not available, then the listener logic is executed to fetch from the real data.
When does the cache change?
- The location where the read-only cache is modified. The read-only cache can only be synchronized every 30 seconds through a scheduled task.
- If the read-only cache cannot be found, but the read/write cache can be queried, it is updated to the read-only cache.
- Delay statistics: 30 seconds timer delay + client cache delay 30 seconds + ribbon (1s) = 61 seconds
// ResponseCacheImpl
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if(currentPayload ! =null) {
payload = currentPayload;
} else {
// If the read-only cache cannot be found, but the read/write cache can be queried, it will be updated.payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); }}else{ payload = readWriteCacheMap.get(key); }}catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
// Read-only cache update, perform a data synchronization 30 seconds
private TimerTask getCacheUpdateTask(a) {
return new TimerTask() {
@Override
public void run(a) {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally{ CurrentRequestVersion.remove(); }}}}; }Copy the code
When the service is discovered, it will lock the registration (modify, remove) operation. When the service is discovered, it will lock the write lock. When the registration (modify, remove) operation, it will lock the read lock.
This is designed to be as accurate as possible.
Eureka Client source code
The Eureka Client service updates the interface information
- Service initialization registration
- The service sends a heartbeat message
- Service list pull, full pull
- Service list pull, incremental pull
I. Initialization process
// DiscoveryClient
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
if(args ! =null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if(myInfo ! =null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.endpointRandomizer = endpointRandomizer;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_".new long[] {15L.30L.60L.120L.240L.480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_".new long[] {15L.30L.60L.120L.240L.480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
if(! config.shouldRegisterWithEureka() && ! config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
return; // no need to setup up an network tasks and we are done
}
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
// Scheduling class for scheduled tasks
scheduler = Executors.newScheduledThreadPool(2.new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// Heartbeat thread pool
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// Configure the refreshed thread pool
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null! = remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
ShouldFetchRegistry turns off service discovery, does not pull other service lists, and simply acts as a service provider
if (clientConfig.shouldFetchRegistry()) {
try {
// Service discovery
// 1. Go to Eureka first to get information
// 2. Then register with Eureka
boolean primaryFetchRegistryResult = fetchRegistry(false);
if(! primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed");
}
boolean backupFetchRegistryResult = true;
// The server can't get the registration information, so it goes to the secondary server to get the registration information
if(! primaryFetchRegistryResult && ! fetchRegistryFromBackup()) { backupFetchRegistryResult =false;
logger.info("Initial registry fetch from backup servers failed");
}
if(! primaryFetchRegistryResult && ! backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); }}catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw newIllegalStateException(th); }}// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler ! =null) {
this.preRegistrationHandler.beforeRegistration();
}
// shouldRegisterWithEureka as a consumer, just a consumer, not to do service registration
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
// Service registration
if(! register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response."); }}catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw newIllegalStateException(th); }}// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
}
Copy the code
The service registry
// Service registration
// DiscoveryClient
boolean register(a) throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
// AbstractJerseyEurekaHttpClient
// Is essentially a Jersey call to the Eureka Server registration interface
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding"."gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info); // info Client data
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if(response ! =null) { response.close(); }}}// The server is handled in ApplicationResource#addInstance
Copy the code
2. Heartbeat connection
// DiscoveryClient
boolean renew(a) {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
// Check if NOT_FOUND 404 is returned
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false; }}Copy the code
3. Service discovery
- Whether to pull eureka in full, pull all registration information.
- Whether to incrementally pull eureka data in the last three minutes; Is there any updated information? If so
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// Get client cache information
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta() // The configuration only pulls the full volume| | (! Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))// VIP address, whether eureka client is currently interested in a single registered address
|| forceFullRegistryFetch // Force full pull
|| (applications == null) // When initialized, it is full
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry(); // full pull
} else {
getAndUpdateDelta(applications); // Incremental pull
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if(tracer ! =null) { tracer.stop(); }}// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
Copy the code
Delta service pull, prevent concurrency there is a lock operation.
// Service incremental pull
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally{ fetchRegistryUpdateLock.unlock(); }}else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if(! reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode);// this makes a remoteCall}}else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); }}Copy the code
RecentlyChangedQueue Clears the data that has been modified within 3 minutes
Every 30 seconds, a scheduled task is performed to clear all microservice instances that have not been updated within 3 minutes
Let’s deltas hashCode with full data
Client: local number hashCode + incremental hashCode
Is the same as the hashCode passed by the server
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
Copy the code
Four. Service off the shelf
- Manually remove the rack and inject
DiscoveryClient
Then call the shutdown method
// DiscoveryClient
public synchronized void shutdown(a) {
if (isShutdown.compareAndSet(false.true)) {
logger.info("Shutting down DiscoveryClient ...");
if(statusChangeListener ! =null&& applicationInfoManager ! =null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if(applicationInfoManager ! =null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if(eurekaTransport ! =null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient"); }}Copy the code
Eureka summary
Eureka is an excellent service registry that implements AP primarily to ensure availability and fault tolerance of partitions.
- All Eureka Server node configuration information is stored in memory, query service service registration information using a multi-layer cache.
- Multi-level caching: Level 1 caching is performed first
**readOnlyCacheMap**
Read, and then read the secondary cache**readWriteCacheMap**
And finally read the real data - The cache is expired, and the second-level cache will be invalid after receiving the register and Renew Cancel request; Service culling deletes the secondary cache; The level 2 cache itself has expired.
- Cache update: If the level-1 cache cannot be queried, the cache will be queried from level-2 cache. If the level-2 cache does not exist, the cache will be synchronized to Level-1 cache. Level-1 cache is automatically synchronized to scheduled tasks once every three minutes.
- Multi-level caching: Level 1 caching is performed first
- In a Eureka Server cluster environment, a registered service initiates registration with only one service, and the current service node traverses other nodes for registration information synchronization.
- By default, the Eureka Client sends a heartbeat every 30 seconds to inform the Eureka Server that the Client is alive and well. If the Eureka Server does not receive a renewal within 90 seconds, it removes the instance from the registry.
- Eureka Server self-protection mechanism. When a large number of services expire and the number of surviving services is less than 85%, the self-protection mechanism will be activated and only 15% of the services will be offline each time.