Nacos source code analysis

1. Configure the center

1. DataId complete format

${prefix}-${spring.profiles.active}.${file-extension}
Copy the code
  • The default project name prefix that spring. The application. The name, but can be by spring. Cloud. Nacos. Config. The prefix configuration
  • Active Indicates the profile corresponding to the current environment
  • File – the extension by spring. Cloud. Nacos. Config. The file – the extension configuration, optional yaml, properties

2.Spring Cloud integrates Nacos configuration center principle

  1. Springapplication.run (), the SpringBoot project starts

    public ConfigurableApplicationContext run(String... args) {
    		try {
          // Process input parameters
    			ApplicationArguments applicationArguments = new DefaultApplicationArguments(
    					args);
          // Prepare the environment
    			ConfigurableEnvironment environment = prepareEnvironment(listeners,
    					applicationArguments);
    			configureIgnoreBeanInfo(environment);
          // Print the icon
    			Banner printedBanner = printBanner(environment);
          // Create context
    			context = createApplicationContext();
          // Get an instance of a class configured in Spring. factories
    			exceptionReporters = getSpringFactoriesInstances(
    					SpringBootExceptionReporter.class,
    					new Class[] { ConfigurableApplicationContext.class }, context);
          // Prepare the context
    			prepareContext(context, environment, listeners, applicationArguments,
    					printedBanner);
          // Refresh the context to load the Bean into the container
    			refreshContext(context);
    			afterRefresh(context, applicationArguments);
    			stopWatch.stop();
    			if (this.logStartupInfo) {
    				new StartupInfoLogger(this.mainApplicationClass)
    						.logStarted(getApplicationLog(), stopWatch);
    			}
    			listeners.started(context);
    			callRunners(context, applicationArguments);
    		}
    		return context;
    	}
    Copy the code
  2. This.preparecontext (), prepares the project context

    private void prepareContext(ConfigurableApplicationContext context, ConfigurableEnvironment environment, SpringApplicationRunListeners listeners, ApplicationArguments applicationArguments, Banner printedBanner) {
       context.setEnvironment(environment);
       postProcessApplicationContext(context);
       / / call all realize ApplicationContextInitializer interfaces subclass
       applyInitializers(context);
       listeners.contextPrepared(context);
    }
    Copy the code
  3. PropertySourceBootstrapConfiguration ApplicationContextInitializer interface is achieved, the initialize () method, to deal with all classes that implement PropertySourceLocator, The key is to call a locator. Locate () method, return PropertySource, and among them contains NacosPropertySourceLocator, return to contain Nacos configuration in PropertySource center configuration, finally writes it to the context

    public void initialize(ConfigurableApplicationContext applicationContext) {
       CompositePropertySource composite = new CompositePropertySource(
             BOOTSTRAP_PROPERTY_SOURCE_NAME);
       AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
       boolean empty = true;
       ConfigurableEnvironment environment = applicationContext.getEnvironment();
       // Get additional configuration
       for (PropertySourceLocator locator : this.propertySourceLocators) { PropertySource<? > source =null;
          source = locator.locate(environment);
          if (source == null) {
             continue;
          }
          logger.info("Located property source: " + source);
          composite.addPropertySource(source);
          empty = false;
       }
       // Load to context
       if(! empty) { MutablePropertySources propertySources = environment.getPropertySources(); String logConfig = environment.resolvePlaceholders("${logging.config:}");
          LogFile logFile = LogFile.get(environment);
          if(propertySources.contains(BOOTSTRAP_PROPERTY_SOURCE_NAME)) { propertySources.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME); } insertPropertySources(propertySources, composite); reinitializeLoggingSystem(environment, logConfig, logFile); setLogLevels(applicationContext, environment); handleIncludedProfiles(environment); }}Copy the code

    So when injection NacosPropertySourceLocator come in?

  4. The SpringBoot project starts up by reading the class names configured in the Spring.Factories file and loading them through reflection. Spring – the cloud – alibaba – nacos – the spring config. Factories NacosConfigBootstrapConfiguration file configuration nacos configuration class

    org.springframework.cloud.bootstrap.BootstrapConfiguration=\
    org.springframework.cloud.alibaba.nacos.NacosConfigBootstrapConfiguration
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.alibaba.nacos.NacosConfigAutoConfiguration,\
    org.springframework.cloud.alibaba.nacos.endpoint.NacosConfigEndpointAutoConfiguration
    org.springframework.boot.diagnostics.FailureAnalyzer=\
    org.springframework.cloud.alibaba.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer
    Copy the code

    NacosConfigBootstrapConfiguration class as follows, can see injected NacosPropertySourceLocator configuration in the class

    @Configuration
    public class NacosConfigBootstrapConfiguration {
    
       @Bean
       public NacosPropertySourceLocator nacosPropertySourceLocator(a) {
          return new NacosPropertySourceLocator();
       }
    
       @Bean
       @ConditionalOnMissingBean
       public NacosConfigProperties nacosConfigProperties(a) {
          return newNacosConfigProperties(); }}Copy the code
  5. NacosPropertySourceLocator. Locate () method, and finally all loading configuration is through loadNacosDataIfPresent (), and finally passed NacosConfigService. GetConfig loading configuration center configuration () method

    @Override
    publicPropertySource<? > locate(Environment env) {/ / create NacosConfigService
       ConfigService configService = nacosConfigProperties.configServiceInstance();
    	 // Omit some basic read configuration operations
       CompositePropertySource composite = new CompositePropertySource(
             NACOS_PROPERTY_SOURCE_NAME);
       // Load the share configuration
       loadSharedConfiguration(composite);
       // Load the extension configuration
       loadExtConfiguration(composite);
       // Loader configuration
       loadApplicationConfiguration(composite, nacosGroup, dataIdPrefix, fileExtension);
       return composite;
    }
    Copy the code

3. The principle of Nacos

  1. Create NacosConfigService object, generate agent, worker

    public NacosConfigService(Properties properties) throws NacosException {
            String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
            if (StringUtils.isBlank(encodeTmp)) {
                encode = Constants.ENCODE;
            } else {
                encode = encodeTmp.trim();
            }
            initNamespace(properties);
      			/ / HTTP proxy
            agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
            agent.start();
            // Important work class
            worker = new ClientWorker(agent, configFilterChainManager, properties);
        }
    Copy the code
  2. ClientWorker initializes two new thread pools

    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    		// Create a thread pool with only one thread and perform a scheduled task: check whether the configuration has changed
        executor = Executors.newScheduledThreadPool();
    		// Long rotation thread pool
        executorService = Executors.newScheduledThreadPool();
      	// execute tasks at a fixed interval of 10ms
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run(a) {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); }}},1L.10L, TimeUnit.MILLISECONDS);
    }
    Copy the code
  3. CheckConfigInfo () Executes tasks in batches to prevent too much data

    public void checkConfigInfo(a) {
        / / task
        int listenerSize = cacheMap.get().size();
        // Round up to the number of batches
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // It takes a lot of thinking to determine if the task is executing. The task list is now unordered. The process of change can be problematic
                executorService.execute(newLongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; }}Copy the code
  4. LongPollingRunnable.run()

    public void run(a) {
    	 //1. Check local configurations
       //code......         
       //2. Check the server configuration. For details, see 5
       List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
    	 //3. Request specific change data and write it to cache
       content = getServerConfig(dataId, group, tenant, 3000L);
       //4. Check whether the cached data has changed, notify the listener, and issue the RefreshEvent event if it has changed, as shown in 6
       cacheData.checkListenerMd5();
       //code....   
    }
    Copy the code
  5. CheckUpdateDataIds () calls checkUpdateConfigStr() to send a request to NACOS to get the changed configuration

    List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
    				// Send a request to obtain the changed configuration. The timeout period is 30 seconds by default
            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
                agent.getEncode(), timeout);
    				return parseUpdateDataIdResponse(result.content);
    }
    Copy the code
  6. NacosContextRefresher publishes RefreshEvent events

    public void receiveConfigInfo(String configInfo) {
       loadCount.incrementAndGet();
       String md5 = "";
       if(! StringUtils.isEmpty(configInfo)) {try {
             MessageDigest md = MessageDigest.getInstance("MD5");
             md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))
                   .toString(16);
          }
          catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
             LOGGER.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);
          }
       }
       refreshHistory.add(dataId, md5);
       applicationContext.publishEvent(
             new RefreshEvent(this.null."Refresh Nacos config"));
       if (LOGGER.isDebugEnabled()) {
          LOGGER.debug("Refresh Nacos config group{},dataId{}", group, dataId); }}Copy the code
  7. RefreshEventListener handles the configuration refresh event

    public synchronized Set<String> refresh(a) {
    		Map<String, Object> before = extract(
    				this.context.getEnvironment().getPropertySources());
    		// Reprepare the environment
      	addConfigFilesToEnvironment();
    		Set<String> keys = changes(before,
    				extract(this.context.getEnvironment().getPropertySources())).keySet();
    		// Refresh the property sources-the property bean corresponding to the property source is replaced with a new one
      	this.context.publishEvent(new EnvironmentChangeEvent(context, keys));
    		// The scope refreshAll operation is triggered, which in the case of the RefreshScope clears the managed cache bean, which is then re-created when called again, injecting a new property source
      	this.scope.refreshAll();
    		return keys;
    	}
    Copy the code

2. Service registry

1. Service registration

  1. Look at the spring. The configuration of factories, NacosDiscoveryAutoConfiguration is related to the service registry automatically configure classes

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.alibaba.nacos.NacosDiscoveryAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.ribbon.RibbonNacosAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ org.springframework.cloud.alibaba.nacos.discovery.NacosDiscoveryClientAutoConfiguration
    Copy the code
  2. Injected NacosServiceRegistry NacosDiscoveryAutoConfiguration in the water in the container, NacosRegistration, NacosAutoServiceRegistration the three beans, Service registration is done by them

    public class NacosDiscoveryAutoConfiguration {
       @Bean
       public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) {
          return new NacosServiceRegistry(nacosDiscoveryProperties);
       }
    
       @Bean
       @ConditionalOnBean(AutoServiceRegistrationProperties.class)
       public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
          return new NacosRegistration(nacosDiscoveryProperties, context);
       }
    
       @Bean
       @ConditionalOnBean(AutoServiceRegistrationProperties.class)
       public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
          return newNacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }}Copy the code
  3. NacosServiceRegistry, which implements the ServiceRegistry interface provided by Spring-Cloud-Commons and overwrites the register and deregister methods. In the register method is mainly the configuration file for Instance Instance, call the namingService. RegisterInstance () method, the service can be registered with the registry

    public class NacosServiceRegistry implements ServiceRegistry<Registration> {
    
       private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class);
    
       private final NacosDiscoveryProperties nacosDiscoveryProperties;
    
       private final NamingService namingService;
    
       public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
          this.nacosDiscoveryProperties = nacosDiscoveryProperties;
          this.namingService = nacosDiscoveryProperties.namingServiceInstance();
       }
    
       @Override
       public void register(Registration registration) {}@Override
       public void deregister(Registration registration) {}}Copy the code
  4. NacosRegistration implements Registration, ServiceInstance, which records some configuration information

    public class NacosRegistration implements Registration.ServiceInstance {
    
       public static final String MANAGEMENT_PORT = "management.port";
       public static final String MANAGEMENT_CONTEXT_PATH = "management.context-path";
       public static final String MANAGEMENT_ADDRESS = "management.address";
       public static final String MANAGEMENT_ENDPOINT_BASE_PATH = "management.endpoints.web.base-path";
    
       private NacosDiscoveryProperties nacosDiscoveryProperties;
    
       private ApplicationContext context;
    
       public NacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
          this.nacosDiscoveryProperties = nacosDiscoveryProperties;
          this.context = context;
       }
    
       @PostConstruct
       public void init(a) {
    
          Map<String, String> metadata = nacosDiscoveryProperties.getMetadata();
          Environment env = context.getEnvironment();
    
          String endpointBasePath = env.getProperty(MANAGEMENT_ENDPOINT_BASE_PATH);
          if(! StringUtils.isEmpty(endpointBasePath)) { metadata.put(MANAGEMENT_ENDPOINT_BASE_PATH, endpointBasePath); } Integer managementPort = ManagementServerPortUtils.getPort(context);if (null! = managementPort) { metadata.put(MANAGEMENT_PORT, managementPort.toString()); String contextPath = env .getProperty("management.server.servlet.context-path");
             String address = env.getProperty("management.server.address");
             if(! StringUtils.isEmpty(contextPath)) { metadata.put(MANAGEMENT_CONTEXT_PATH, contextPath); }if(! StringUtils.isEmpty(address)) { metadata.put(MANAGEMENT_ADDRESS, address); }}}@Override
       public String getServiceId(a) {
          return nacosDiscoveryProperties.getService();
       }
    
       @Override
       public String getHost(a) {
          return nacosDiscoveryProperties.getIp();
       }
    
       @Override
       public int getPort(a) {
          return nacosDiscoveryProperties.getPort();
       }
    
       public void setPort(int port) {
          this.nacosDiscoveryProperties.setPort(port);
       }
    
       @Override
       public boolean isSecure(a) {
          return nacosDiscoveryProperties.isSecure();
       }
    
       @Override
       public URI getUri(a) {
          return DefaultServiceInstance.getUri(this);
       }
    
       @Override
       public Map<String, String> getMetadata(a) {
          return nacosDiscoveryProperties.getMetadata();
       }
    
       public boolean isRegisterEnabled(a) {
          return nacosDiscoveryProperties.isRegisterEnabled();
       }
    
       public String getCluster(a) {
          return nacosDiscoveryProperties.getClusterName();
       }
    
       public float getRegisterWeight(a) {
          return nacosDiscoveryProperties.getWeight();
       }
    
       public NacosDiscoveryProperties getNacosDiscoveryProperties(a) {
          return nacosDiscoveryProperties;
       }
    
       public NamingService getNacosNamingService(a) {
          return nacosDiscoveryProperties.namingServiceInstance();
       }
    
       @Override
       public String toString(a) {
          return "NacosRegistration{" + "nacosDiscoveryProperties="
                + nacosDiscoveryProperties + '} '; }}Copy the code

Now that the configuration information and the registration method are available, when is the registration method called to register the service to the registry? Let’s move on to the last Bean

  1. The method to realize NacosAutoServiceRegistration nothing special, inherited abstract class AbstractAutoServiceRegistration focus on it

    public class NacosAutoServiceRegistration
          extends AbstractAutoServiceRegistration<Registration> {}Copy the code

    The Spring Boot framework implements a publish-subscribe model, in which a series of events are published during project startup. Listening in AbstractAutoServiceRegistration WebServerInitializedEvent (in the web server startup is released after the completion of the event), the register () to obtain the information in the registration, Call serviceRegistry to complete service registration and inherit nacos using the two implementations injected into the container above, NacosServiceRegistry and NacosRegistration

    public abstract class AbstractAutoServiceRegistration<R extends Registration>
          implements AutoServiceRegistration.ApplicationContextAware {
       private static final Log logger = LogFactory
             .getLog(AbstractAutoServiceRegistration.class);
       @EventListener(WebServerInitializedEvent.class)
       public void bind(WebServerInitializedEvent event) {
          ApplicationContext context = event.getApplicationContext();
          if (context instanceof ConfigurableWebServerApplicationContext) {
             if ("management".equals(
                   ((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
                return; }}this.port.compareAndSet(0, event.getWebServer().getPort());
          this.start();
       }
      
       public void start(a) {
          if(! isEnabled()) {if (logger.isDebugEnabled()) {
                logger.debug("Discovery Lifecycle disabled. Not starting");
             }
             return;
          }
    
          // only initialize if nonSecurePort is greater than 0 and it isn't already running
          // because of containerPortInitializer below
          if (!this.running.get()) {
             register();
             if (shouldRegisterManagement()) {
                registerManagement();
             }
             this.context.publishEvent(
                   new InstanceRegisteredEvent<>(this, getConfiguration()));
             this.running.compareAndSet(false.true); }}@PreDestroy
       public void destroy(a) {
          stop();
       }
       protected void register(a) {
          this.serviceRegistry.register(getRegistration());
       }
       protected void deregister(a) {
          this.serviceRegistry.deregister(getRegistration());
       }
       public void stop(a) {
          if (this.getRunning().compareAndSet(true.false) && isEnabled()) {
             deregister();
             if (shouldRegisterManagement()) {
                deregisterManagement();
             }
             this.serviceRegistry.close(); }}}Copy the code
  2. Check the heartbeat mechanism service status. The local service sends heartbeat packets at a fixed rate with the BeatReactor enabled at nacOS registration

    public class BeatReactor {
        private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1.new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                returnthread; }});private long clientBeatInterval = 10000L;
        private NamingProxy serverProxy;
        public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap();
    
        public BeatReactor(NamingProxy serverProxy) {
            this.serverProxy = serverProxy;
            this.executorService.scheduleAtFixedRate(new BeatReactor.BeatProcessor(), 0L.this.clientBeatInterval, TimeUnit.MILLISECONDS);
        }
    
        public void addBeatInfo(String dom, BeatInfo beatInfo) {
            LogUtils.LOG.info("BEAT"."adding service:" + dom + " to beat map.");
            this.dom2Beat.put(dom, beatInfo);
        }
    
        public void removeBeatInfo(String dom) {
            LogUtils.LOG.info("BEAT"."removing service:" + dom + " from beat map.");
            this.dom2Beat.remove(dom);
        }
    
        class BeatTask implements Runnable {
            BeatInfo beatInfo;
    
            public BeatTask(BeatInfo beatInfo) {
                this.beatInfo = beatInfo;
            }
    
            public void run(a) {
                Map<String, String> params = new HashMap(2);
                params.put("beat", JSON.toJSONString(this.beatInfo));
                params.put("dom".this.beatInfo.getDom());
    
                try {
                    String result = BeatReactor.this.serverProxy.callAllServers("/nacos/v1/ns/api/clientBeat", params);
                    JSONObject jsonObject = JSON.parseObject(result);
                    if(jsonObject ! =null) {
                        BeatReactor.this.clientBeatInterval = jsonObject.getLong("clientBeatInterval"); }}catch (Exception var4) {
                    LogUtils.LOG.error("CLIENT-BEAT"."failed to send beat: " + JSON.toJSONString(this.beatInfo), var4); }}}class BeatProcessor implements Runnable {
            BeatProcessor() {
            }
    
            public void run(a) {
                try {
                    Iterator var1 = BeatReactor.this.dom2Beat.entrySet().iterator();
    
                    while(var1.hasNext()) {
                        Entry<String, BeatInfo> entry = (Entry)var1.next();
                        BeatInfo beatInfo = (BeatInfo)entry.getValue();
                        BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);
                        LogUtils.LOG.debug("BEAT"."send beat to server: ".newObject[]{beatInfo.toString()}); }}catch (Exception var4) {
                    LogUtils.LOG.error("CLIENT-BEAT"."Exception while scheduling beat.", var4); }}}}Copy the code

2. Service discovery

  1. NacosNamingService creates hostReactor at initialization to get, save, and update service instance information
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
                   boolean loadCacheAtStart, int pollingThreadCount) {

    executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.client.naming.updater");
            returnthread; }});this.eventDispatcher = eventDispatcher;
    this.serverProxy = serverProxy;
    this.cacheDir = cacheDir;
    if (loadCacheAtStart) {
        this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
    } else {
        this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
    }
    this.updatingMap = new ConcurrentHashMap<String, Object>();
    //failoverReactor Used for failover
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    //pushReceiver is used to receive push messages from the Nacos server and transmit messages in UDP mode
    this.pushReceiver = new PushReceiver(this);
}
Copy the code
  1. PushReceiver starts a thread to receive UDP push messages from the server
public class PushReceiver implements Runnable {

    private ScheduledExecutorService executorService;

    private static final int UDP_MSS = 64 * 1024;

    private DatagramSocket udpSocket;

    private HostReactor hostReactor;

    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            udpSocket = new DatagramSocket();

            executorService = new ScheduledThreadPoolExecutor(1.new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    returnthread; }}); executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e); }}@Override
    public void run(a) {
        while (true) {
            try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    hostReactor.processServiceJSON(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + \ \ "}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\""
                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\" "
                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                        + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + \ \ "}";
                }

                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] error while receiving push data", e); }}}public static class PushPacket {
        public String type;
        public long lastRefTime;
        public String data;
    }

    public int getUDPPort(a) {
        returnudpSocket.getLocalPort(); }}Copy the code
  1. HostReactor Describes the main methods
public class HostReactor {

  	// Get the service map
    public Map<String, ServiceInfo> getServiceInfoMap(a) {
        return serviceInfoMap;
    }
    // Add the update task
    public synchronizedScheduledFuture<? > addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }
    // The processing server returns json and calls eventDispatcher to deliver the corresponding event
    public ServiceInfo processServiceJSON(String json);
    // Get service information from the local map
    private ServiceInfo getServiceInfo0(String serviceName, String clusters);
		// Get service information directly from the server
    public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters);
    // Get service information
    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        / / 1. FailoverReactor. IsFailoverSwitch (), disaster according to switch to decide whether disaster from the local slow access data directly
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
      	//2. GetServiceInfo0 (serviceName, Clusters) caches data locally
				ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        //3. If no data is obtained, updateServiceNow will be updated
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);

            serviceInfoMap.put(serviceObj.getKey(), serviceObj);

            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);

        } 
      	// The current service is being updated while waiting
      	else if (updatingMap.containsKey(serviceName)) {

            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:"+ clusters, e); }}}}// Start the task update service
        scheduleUpdateIfAbsent(serviceName, clusters);
        // Return to service
        return serviceInfoMap.get(serviceObj.getKey());
    }

    public void scheduleUpdateIfAbsent(String serviceName, String clusters) { ScheduledFuture<? > future = addTask(new UpdateTask(serviceName, clusters));
    }

  	// Request the server to synchronize the update service
    public void updateServiceNow(String serviceName, String clusters) {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {

            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
            if(StringUtils.isNotEmpty(result)) { processServiceJSON(result); }}catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if(oldService ! =null) {
                synchronized(oldService) { oldService.notifyAll(); }}}}// Just ask the service to do nothing
    public void refreshOnly(String serviceName, String clusters) {
        try {
            serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: "+ serviceName, e); }}// Update the task at a fixed speed after calling it once
    public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private String clusters;
        private String serviceName;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        @Override
        public void run(a) {
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

                if (serviceObj == null) {
                    updateServiceNow(serviceName, clusters);
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }

                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateServiceNow(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }

                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);

                lastRefTime = serviceObj.getLastRefTime();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("[NA] failed to update serviceName: "+ serviceName, e); }}}}Copy the code