Nacos source code analysis
1. Configure the center
1. DataId complete format
${prefix}-${spring.profiles.active}.${file-extension}
Copy the code
- The default project name prefix that spring. The application. The name, but can be by spring. Cloud. Nacos. Config. The prefix configuration
- Active Indicates the profile corresponding to the current environment
- File – the extension by spring. Cloud. Nacos. Config. The file – the extension configuration, optional yaml, properties
2.Spring Cloud integrates Nacos configuration center principle
-
Springapplication.run (), the SpringBoot project starts
public ConfigurableApplicationContext run(String... args) { try { // Process input parameters ApplicationArguments applicationArguments = new DefaultApplicationArguments( args); // Prepare the environment ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments); configureIgnoreBeanInfo(environment); // Print the icon Banner printedBanner = printBanner(environment); // Create context context = createApplicationContext(); // Get an instance of a class configured in Spring. factories exceptionReporters = getSpringFactoriesInstances( SpringBootExceptionReporter.class, new Class[] { ConfigurableApplicationContext.class }, context); // Prepare the context prepareContext(context, environment, listeners, applicationArguments, printedBanner); // Refresh the context to load the Bean into the container refreshContext(context); afterRefresh(context, applicationArguments); stopWatch.stop(); if (this.logStartupInfo) { new StartupInfoLogger(this.mainApplicationClass) .logStarted(getApplicationLog(), stopWatch); } listeners.started(context); callRunners(context, applicationArguments); } return context; } Copy the code
-
This.preparecontext (), prepares the project context
private void prepareContext(ConfigurableApplicationContext context, ConfigurableEnvironment environment, SpringApplicationRunListeners listeners, ApplicationArguments applicationArguments, Banner printedBanner) { context.setEnvironment(environment); postProcessApplicationContext(context); / / call all realize ApplicationContextInitializer interfaces subclass applyInitializers(context); listeners.contextPrepared(context); } Copy the code
-
PropertySourceBootstrapConfiguration ApplicationContextInitializer interface is achieved, the initialize () method, to deal with all classes that implement PropertySourceLocator, The key is to call a locator. Locate () method, return PropertySource, and among them contains NacosPropertySourceLocator, return to contain Nacos configuration in PropertySource center configuration, finally writes it to the context
public void initialize(ConfigurableApplicationContext applicationContext) { CompositePropertySource composite = new CompositePropertySource( BOOTSTRAP_PROPERTY_SOURCE_NAME); AnnotationAwareOrderComparator.sort(this.propertySourceLocators); boolean empty = true; ConfigurableEnvironment environment = applicationContext.getEnvironment(); // Get additional configuration for (PropertySourceLocator locator : this.propertySourceLocators) { PropertySource<? > source =null; source = locator.locate(environment); if (source == null) { continue; } logger.info("Located property source: " + source); composite.addPropertySource(source); empty = false; } // Load to context if(! empty) { MutablePropertySources propertySources = environment.getPropertySources(); String logConfig = environment.resolvePlaceholders("${logging.config:}"); LogFile logFile = LogFile.get(environment); if(propertySources.contains(BOOTSTRAP_PROPERTY_SOURCE_NAME)) { propertySources.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME); } insertPropertySources(propertySources, composite); reinitializeLoggingSystem(environment, logConfig, logFile); setLogLevels(applicationContext, environment); handleIncludedProfiles(environment); }}Copy the code
So when injection NacosPropertySourceLocator come in?
-
The SpringBoot project starts up by reading the class names configured in the Spring.Factories file and loading them through reflection. Spring – the cloud – alibaba – nacos – the spring config. Factories NacosConfigBootstrapConfiguration file configuration nacos configuration class
org.springframework.cloud.bootstrap.BootstrapConfiguration=\ org.springframework.cloud.alibaba.nacos.NacosConfigBootstrapConfiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.alibaba.nacos.NacosConfigAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.endpoint.NacosConfigEndpointAutoConfiguration org.springframework.boot.diagnostics.FailureAnalyzer=\ org.springframework.cloud.alibaba.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer Copy the code
NacosConfigBootstrapConfiguration class as follows, can see injected NacosPropertySourceLocator configuration in the class
@Configuration public class NacosConfigBootstrapConfiguration { @Bean public NacosPropertySourceLocator nacosPropertySourceLocator(a) { return new NacosPropertySourceLocator(); } @Bean @ConditionalOnMissingBean public NacosConfigProperties nacosConfigProperties(a) { return newNacosConfigProperties(); }}Copy the code
-
NacosPropertySourceLocator. Locate () method, and finally all loading configuration is through loadNacosDataIfPresent (), and finally passed NacosConfigService. GetConfig loading configuration center configuration () method
@Override publicPropertySource<? > locate(Environment env) {/ / create NacosConfigService ConfigService configService = nacosConfigProperties.configServiceInstance(); // Omit some basic read configuration operations CompositePropertySource composite = new CompositePropertySource( NACOS_PROPERTY_SOURCE_NAME); // Load the share configuration loadSharedConfiguration(composite); // Load the extension configuration loadExtConfiguration(composite); // Loader configuration loadApplicationConfiguration(composite, nacosGroup, dataIdPrefix, fileExtension); return composite; } Copy the code
3. The principle of Nacos
-
Create NacosConfigService object, generate agent, worker
public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { encode = Constants.ENCODE; } else { encode = encodeTmp.trim(); } initNamespace(properties); / / HTTP proxy agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); // Important work class worker = new ClientWorker(agent, configFilterChainManager, properties); } Copy the code
-
ClientWorker initializes two new thread pools
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { // Create a thread pool with only one thread and perform a scheduled task: check whether the configuration has changed executor = Executors.newScheduledThreadPool(); // Long rotation thread pool executorService = Executors.newScheduledThreadPool(); // execute tasks at a fixed interval of 10ms executor.scheduleWithFixedDelay(new Runnable() { @Override public void run(a) { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); }}},1L.10L, TimeUnit.MILLISECONDS); } Copy the code
-
CheckConfigInfo () Executes tasks in batches to prevent too much data
public void checkConfigInfo(a) { / / task int listenerSize = cacheMap.get().size(); // Round up to the number of batches int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // It takes a lot of thinking to determine if the task is executing. The task list is now unordered. The process of change can be problematic executorService.execute(newLongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; }}Copy the code
-
LongPollingRunnable.run()
public void run(a) { //1. Check local configurations //code...... //2. Check the server configuration. For details, see 5 List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); //3. Request specific change data and write it to cache content = getServerConfig(dataId, group, tenant, 3000L); //4. Check whether the cached data has changed, notify the listener, and issue the RefreshEvent event if it has changed, as shown in 6 cacheData.checkListenerMd5(); //code.... } Copy the code
-
CheckUpdateDataIds () calls checkUpdateConfigStr() to send a request to NACOS to get the changed configuration
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { // Send a request to obtain the changed configuration. The timeout period is 30 seconds by default HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); return parseUpdateDataIdResponse(result.content); } Copy the code
-
NacosContextRefresher publishes RefreshEvent events
public void receiveConfigInfo(String configInfo) { loadCount.incrementAndGet(); String md5 = ""; if(! StringUtils.isEmpty(configInfo)) {try { MessageDigest md = MessageDigest.getInstance("MD5"); md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8"))) .toString(16); } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { LOGGER.warn("[Nacos] unable to get md5 for dataId: " + dataId, e); } } refreshHistory.add(dataId, md5); applicationContext.publishEvent( new RefreshEvent(this.null."Refresh Nacos config")); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Refresh Nacos config group{},dataId{}", group, dataId); }}Copy the code
-
RefreshEventListener handles the configuration refresh event
public synchronized Set<String> refresh(a) { Map<String, Object> before = extract( this.context.getEnvironment().getPropertySources()); // Reprepare the environment addConfigFilesToEnvironment(); Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet(); // Refresh the property sources-the property bean corresponding to the property source is replaced with a new one this.context.publishEvent(new EnvironmentChangeEvent(context, keys)); // The scope refreshAll operation is triggered, which in the case of the RefreshScope clears the managed cache bean, which is then re-created when called again, injecting a new property source this.scope.refreshAll(); return keys; } Copy the code
2. Service registry
1. Service registration
-
Look at the spring. The configuration of factories, NacosDiscoveryAutoConfiguration is related to the service registry automatically configure classes
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.alibaba.nacos.NacosDiscoveryAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.ribbon.RibbonNacosAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.discovery.NacosDiscoveryClientAutoConfiguration Copy the code
-
Injected NacosServiceRegistry NacosDiscoveryAutoConfiguration in the water in the container, NacosRegistration, NacosAutoServiceRegistration the three beans, Service registration is done by them
public class NacosDiscoveryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return newNacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }}Copy the code
-
NacosServiceRegistry, which implements the ServiceRegistry interface provided by Spring-Cloud-Commons and overwrites the register and deregister methods. In the register method is mainly the configuration file for Instance Instance, call the namingService. RegisterInstance () method, the service can be registered with the registry
public class NacosServiceRegistry implements ServiceRegistry<Registration> { private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class); private final NacosDiscoveryProperties nacosDiscoveryProperties; private final NamingService namingService; public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) { this.nacosDiscoveryProperties = nacosDiscoveryProperties; this.namingService = nacosDiscoveryProperties.namingServiceInstance(); } @Override public void register(Registration registration) {}@Override public void deregister(Registration registration) {}}Copy the code
-
NacosRegistration implements Registration, ServiceInstance, which records some configuration information
public class NacosRegistration implements Registration.ServiceInstance { public static final String MANAGEMENT_PORT = "management.port"; public static final String MANAGEMENT_CONTEXT_PATH = "management.context-path"; public static final String MANAGEMENT_ADDRESS = "management.address"; public static final String MANAGEMENT_ENDPOINT_BASE_PATH = "management.endpoints.web.base-path"; private NacosDiscoveryProperties nacosDiscoveryProperties; private ApplicationContext context; public NacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { this.nacosDiscoveryProperties = nacosDiscoveryProperties; this.context = context; } @PostConstruct public void init(a) { Map<String, String> metadata = nacosDiscoveryProperties.getMetadata(); Environment env = context.getEnvironment(); String endpointBasePath = env.getProperty(MANAGEMENT_ENDPOINT_BASE_PATH); if(! StringUtils.isEmpty(endpointBasePath)) { metadata.put(MANAGEMENT_ENDPOINT_BASE_PATH, endpointBasePath); } Integer managementPort = ManagementServerPortUtils.getPort(context);if (null! = managementPort) { metadata.put(MANAGEMENT_PORT, managementPort.toString()); String contextPath = env .getProperty("management.server.servlet.context-path"); String address = env.getProperty("management.server.address"); if(! StringUtils.isEmpty(contextPath)) { metadata.put(MANAGEMENT_CONTEXT_PATH, contextPath); }if(! StringUtils.isEmpty(address)) { metadata.put(MANAGEMENT_ADDRESS, address); }}}@Override public String getServiceId(a) { return nacosDiscoveryProperties.getService(); } @Override public String getHost(a) { return nacosDiscoveryProperties.getIp(); } @Override public int getPort(a) { return nacosDiscoveryProperties.getPort(); } public void setPort(int port) { this.nacosDiscoveryProperties.setPort(port); } @Override public boolean isSecure(a) { return nacosDiscoveryProperties.isSecure(); } @Override public URI getUri(a) { return DefaultServiceInstance.getUri(this); } @Override public Map<String, String> getMetadata(a) { return nacosDiscoveryProperties.getMetadata(); } public boolean isRegisterEnabled(a) { return nacosDiscoveryProperties.isRegisterEnabled(); } public String getCluster(a) { return nacosDiscoveryProperties.getClusterName(); } public float getRegisterWeight(a) { return nacosDiscoveryProperties.getWeight(); } public NacosDiscoveryProperties getNacosDiscoveryProperties(a) { return nacosDiscoveryProperties; } public NamingService getNacosNamingService(a) { return nacosDiscoveryProperties.namingServiceInstance(); } @Override public String toString(a) { return "NacosRegistration{" + "nacosDiscoveryProperties=" + nacosDiscoveryProperties + '} '; }}Copy the code
Now that the configuration information and the registration method are available, when is the registration method called to register the service to the registry? Let’s move on to the last Bean
-
The method to realize NacosAutoServiceRegistration nothing special, inherited abstract class AbstractAutoServiceRegistration focus on it
public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {}Copy the code
The Spring Boot framework implements a publish-subscribe model, in which a series of events are published during project startup. Listening in AbstractAutoServiceRegistration WebServerInitializedEvent (in the web server startup is released after the completion of the event), the register () to obtain the information in the registration, Call serviceRegistry to complete service registration and inherit nacos using the two implementations injected into the container above, NacosServiceRegistry and NacosRegistration
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration.ApplicationContextAware { private static final Log logger = LogFactory .getLog(AbstractAutoServiceRegistration.class); @EventListener(WebServerInitializedEvent.class) public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals( ((ConfigurableWebServerApplicationContext) context).getServerNamespace())) { return; }}this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } public void start(a) { if(! isEnabled()) {if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get()) { register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false.true); }}@PreDestroy public void destroy(a) { stop(); } protected void register(a) { this.serviceRegistry.register(getRegistration()); } protected void deregister(a) { this.serviceRegistry.deregister(getRegistration()); } public void stop(a) { if (this.getRunning().compareAndSet(true.false) && isEnabled()) { deregister(); if (shouldRegisterManagement()) { deregisterManagement(); } this.serviceRegistry.close(); }}}Copy the code
-
Check the heartbeat mechanism service status. The local service sends heartbeat packets at a fixed rate with the BeatReactor enabled at nacOS registration
public class BeatReactor { private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1.new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); returnthread; }});private long clientBeatInterval = 10000L; private NamingProxy serverProxy; public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap(); public BeatReactor(NamingProxy serverProxy) { this.serverProxy = serverProxy; this.executorService.scheduleAtFixedRate(new BeatReactor.BeatProcessor(), 0L.this.clientBeatInterval, TimeUnit.MILLISECONDS); } public void addBeatInfo(String dom, BeatInfo beatInfo) { LogUtils.LOG.info("BEAT"."adding service:" + dom + " to beat map."); this.dom2Beat.put(dom, beatInfo); } public void removeBeatInfo(String dom) { LogUtils.LOG.info("BEAT"."removing service:" + dom + " from beat map."); this.dom2Beat.remove(dom); } class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } public void run(a) { Map<String, String> params = new HashMap(2); params.put("beat", JSON.toJSONString(this.beatInfo)); params.put("dom".this.beatInfo.getDom()); try { String result = BeatReactor.this.serverProxy.callAllServers("/nacos/v1/ns/api/clientBeat", params); JSONObject jsonObject = JSON.parseObject(result); if(jsonObject ! =null) { BeatReactor.this.clientBeatInterval = jsonObject.getLong("clientBeatInterval"); }}catch (Exception var4) { LogUtils.LOG.error("CLIENT-BEAT"."failed to send beat: " + JSON.toJSONString(this.beatInfo), var4); }}}class BeatProcessor implements Runnable { BeatProcessor() { } public void run(a) { try { Iterator var1 = BeatReactor.this.dom2Beat.entrySet().iterator(); while(var1.hasNext()) { Entry<String, BeatInfo> entry = (Entry)var1.next(); BeatInfo beatInfo = (BeatInfo)entry.getValue(); BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS); LogUtils.LOG.debug("BEAT"."send beat to server: ".newObject[]{beatInfo.toString()}); }}catch (Exception var4) { LogUtils.LOG.error("CLIENT-BEAT"."Exception while scheduling beat.", var4); }}}}Copy the code
2. Service discovery
- NacosNamingService creates hostReactor at initialization to get, save, and update service instance information
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {
executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
returnthread; }});this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.updatingMap = new ConcurrentHashMap<String, Object>();
//failoverReactor Used for failover
this.failoverReactor = new FailoverReactor(this, cacheDir);
//pushReceiver is used to receive push messages from the Nacos server and transmit messages in UDP mode
this.pushReceiver = new PushReceiver(this);
}
Copy the code
- PushReceiver starts a thread to receive UDP push messages from the server
public class PushReceiver implements Runnable {
private ScheduledExecutorService executorService;
private static final int UDP_MSS = 64 * 1024;
private DatagramSocket udpSocket;
private HostReactor hostReactor;
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
udpSocket = new DatagramSocket();
executorService = new ScheduledThreadPoolExecutor(1.new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
returnthread; }}); executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e); }}@Override
public void run(a) {
while (true) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + \ \ "}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\" "
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + \ \ "}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e); }}}public static class PushPacket {
public String type;
public long lastRefTime;
public String data;
}
public int getUDPPort(a) {
returnudpSocket.getLocalPort(); }}Copy the code
- HostReactor Describes the main methods
public class HostReactor {
// Get the service map
public Map<String, ServiceInfo> getServiceInfoMap(a) {
return serviceInfoMap;
}
// Add the update task
public synchronizedScheduledFuture<? > addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
// The processing server returns json and calls eventDispatcher to deliver the corresponding event
public ServiceInfo processServiceJSON(String json);
// Get service information from the local map
private ServiceInfo getServiceInfo0(String serviceName, String clusters);
// Get service information directly from the server
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters);
// Get service information
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
/ / 1. FailoverReactor. IsFailoverSwitch (), disaster according to switch to decide whether disaster from the local slow access data directly
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//2. GetServiceInfo0 (serviceName, Clusters) caches data locally
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
//3. If no data is obtained, updateServiceNow will be updated
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
}
// The current service is being updated while waiting
else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:"+ clusters, e); }}}}// Start the task update service
scheduleUpdateIfAbsent(serviceName, clusters);
// Return to service
return serviceInfoMap.get(serviceObj.getKey());
}
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { ScheduledFuture<? > future = addTask(new UpdateTask(serviceName, clusters));
}
// Request the server to synchronize the update service
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if(StringUtils.isNotEmpty(result)) { processServiceJSON(result); }}catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if(oldService ! =null) {
synchronized(oldService) { oldService.notifyAll(); }}}}// Just ask the service to do nothing
public void refreshOnly(String serviceName, String clusters) {
try {
serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: "+ serviceName, e); }}// Update the task at a fixed speed after calling it once
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String serviceName;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
@Override
public void run(a) {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
lastRefTime = serviceObj.getLastRefTime();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: "+ serviceName, e); }}}}Copy the code