Get registry
Request the entry
Start by finding ApplicationsResource
// Full download
public Response getContainers(...). {
boolean isRemoteRegionRequested = null! = regionsStr && ! regionsStr.isEmpty(); String[] regions =null;
// Whether to request a remote region
if(! isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); }else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Whether the registry is accessible
if(! registry.shouldAllowAccess(isRemoteRegionRequested)) {return Response.status(Status.FORBIDDEN).build();
}
/ / define cacheKey
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
ResponseCache. GetGZIP (cacheKey)/responseCache. Get (cacheKey)
}
// The incremental download mode is almost the same as here, method entry
public Response getContainerDifferential(...).
Copy the code
Build ResponseCacheImpl
ResponseCacheImpl reads from a read-only cache map and a read/write cache map
Let’s look at the ResponseCacheImpl constructor before we look at getting the data
// Read-only cache map
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
// Read and write cache map
private final LoadingCache<Key, Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.readWriteCacheMap =
// Initialize the capacity. The default capacity is 1000
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
// Expiration time 180s
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
...
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {...// Get value
Value value = generatePayload(key);
returnvalue; }});// Whether to use read-only cache. This can be configured when configuring EurekaServer
if (shouldUseReadOnlyResponseCache) {
/ / create timing task, elder brother update readOnlyCacheMap regularly, according to the time responseCacheUpdateIntervalMs default is 30 s
timer.schedule(getCacheUpdateTask(),
newDate(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); }}// Task information getCacheUpdateTask()
/ / traverse readOnlyCacheMap
for (Key key : readOnlyCacheMap.keySet()) {
try {
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
// If readOnlyCacheMap does not match only the readWriteCacheMap value, the value is assigned
if(cacheValue ! = currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); }}}Copy the code
GeneratePayload method explained
// We are only looking at the full processing
if (ALL_APPS.equals(key.getName())) {
boolean isRemoteRegionRequested = key.hasRegions();
if (isRemoteRegionRequested) {
// Get registry information
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
// Get registry informationpayload = getPayLoad(key, registry.getApplications()); }}else {
if (isRemoteRegionRequested) {
payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else{ payload = getPayLoad(key, registry.getApplicationDeltas()); }}Copy the code
Get registry
AbstractInstanceRegistry retrieves local and remote registries and incremental retrieves
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
// Whether to contain remote region
boolean includeRemoteRegion = null! = remoteRegions && remoteRegions.length ! =0;
Applications apps = new Applications();
apps.setVersion(1L);
// Get the local registry information, then iterate over it and assign to apps.if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null! = remoteRegistry) {// Get the Applications on the remote end
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
// Get the local application based on the service name
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
// If local is not empty, local is used; if local is empty, remote is used
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for(InstanceInfo instanceInfo : application.getInstances()) { appInstanceTillNow.addInstance(instanceInfo); }... }// Incremental fetch
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
write.lock();
RecentlyChangedQueue = recentlyChangedQueue = recentlyChangedQueue = recentlyChangedQueue
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
while (iter.hasNext()) {
...
// The only difference is new InstanceInfo. The purpose of this is to prevent the recentlyChangedQueue from changing the related value and causing the returned value to change
app.addInstance(newInstanceInfo(decorateInstanceInfo(lease))); }}finally{ write.unlock(); }}//recentlyChangedQueue is cleared periodically
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
// The default delay is 30 seconds. The operation is performed every 30 seconds
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
/ / getDeltaRetentionTask, other getRetentionTimeInMSInDeltaQueue this can be configured, the default is 180 s
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break; }}Copy the code
With the source of the data in the cache covered, it’s time to look at the processing of getting the data from the cache
Fetch data from the cache
Fetch data from the cache
public byte[] getGZIP(Key key) {
// Whether to get from the read-only response cache The default is true
Value payload = getValue(key, shouldUseReadOnlyResponseCache);
if (payload == null) {
return null;
}
return payload.getGzipped();
}
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
// This value is passed from above and defaults to true
if (useReadOnlyCache) {
// Get data from the read-only cache map
final Value currentPayload = readOnlyCacheMap.get(key);
if(currentPayload ! =null) {
payload = currentPayload;
} else {
// Get data from the read/write map
payload = readWriteCacheMap.get(key);
//readOnlyCacheMap is updated periodically. If the readWirtemap file is empty at first, the readWirtemap file is automatically updated laterreadOnlyCacheMap.put(key, payload); }}else{ payload = readWriteCacheMap.get(key); }}catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
Copy the code
conclusion
- The fetch registry includes incremental fetch and full fetch
- It is important to note and think about the design of read-write separation when full fetching