I. Entrance analysis
Entry point 1:
@EnableDiscoveryClient
What does the @enableDiscoveryClient annotation do if you want to register a service with a registry?
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited Public @ @ Import (EnableDiscoveryClientImportSelector. Class) interface EnableDiscoveryClient {/ / the default value is true / / if it is true, Boolean autoRegister() default true; }Copy the code
From EnableDiscoveryClient source can be seen that the interface has a autoRegister () method is the default return value is true, it also cited EnableDiscoveryClientImportSelector class.
public class EnableDiscoveryClientImportSelector extends SpringFactoryImportSelector<EnableDiscoveryClient> { @Override public String[] selectImports(AnnotationMetadata metadata) { String[] imports = super.selectImports(metadata); AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(getAnnotationClass().getName(), true)); boolean autoRegister = attributes.getBoolean("autoRegister"); if (autoRegister) { List<String> importsList = new ArrayList<>(Arrays.asList(imports)); importsList.add( "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration"); imports = importsList.toArray(new String[0]); } else { Environment env = getEnvironment(); if (ConfigurableEnvironment.class.isInstance(env)) { ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env; LinkedHashMap<String, Object> map = new LinkedHashMap<>(); map.put("spring.cloud.service-registry.auto-registration.enabled", false); MapPropertySource propertySource = new MapPropertySource( "springCloudDiscoveryClient", map); configEnv.getPropertySources().addLast(propertySource); } } return imports; }}Copy the code
To view the selectImports method, obtain the autoRegister property of the @EnableDiscoveryClient annotation.
When autoRegister = true, the system will go to the assembly AutoServiceRegistrationConfiguration class, Leon
With springboot self assembly mechanism and AutoServiceRegistrationConfiguration class action is to inject some me
Configure attributes to yML files (such as whether to automatically register)
@Configuration
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public class AutoServiceRegistrationConfiguration {
}
Copy the code
Entry point 2:
META-INF/spring.factories
All of the above beans are injected into the Spring container using springBoot automatic injection
We see NacosDiscoveryAutoConfiguration
@Configuration @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class }) public class NacosDiscoveryAutoConfiguration { /** * NacosServiceRegistry: Implement service registration */ @bean public NacosServiceRegistry NacosServiceRegistry (NacosDiscoveryProperties) nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } /** * NacosRegistration: Holds the basic data of the service * This Bean is injected into the Spring container under the following conditions: Containers must be AutoServiceRegistrationProperties this bean * / @ bean @ ConditionalOnBean (AutoServiceRegistrationProperties. Class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); * * *} / NacosAutoServiceRegistration: To realize automatic service registration * / @ Bean @ ConditionalOnBean (AutoServiceRegistrationProperties. Class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }}Copy the code
Ii. Main line analysis
Automatic service registration
1) Principle analysis
Spring Cloud has the implementation of Euerka, ZK, and other registries. To achieve a uniform implementation, there must be a specification, and Spring Cloud Commons defines this specification.
The inside of the Spring Cloud Commons org. Springframework. Cloud. Client. The serviceregistry package below AutoServiceRegistration, Registration, and ServiceRegistry are the core interfaces for service Registration.
1, AutoServiceRegistration
AutoServiceRegistration is used for automatic service registration. Automatic registration means that service information is automatically registered to the registry after the service is started.
public interface AutoServiceRegistration {
}
Copy the code
Autoservice Stration does not define a method, but it exists to standardize the implementation by requiring automatic registration.
2, Registration
Registration Stores service information that specifies what information to register with the registry.
public interface Registration extends ServiceInstance {
}
Copy the code
Registration inherits ServiceInstance, which defines what information a ServiceInstance should have.
Public interface ServiceInstance {/** * The unique instance ID * @return The unique instance ID as registered. */ default String getInstanceId() { return null; } /** * @return The service ID as registered. */ String getServiceId(); /** * @return The hostname of The registered service instance. */ String getHost(); @return The port of The registered service instance. */ int getPort(); @return Whether the port of the registered service instance uses HTTPS. */ Boolean isSecure(); /** * @return The service URI address. */ URI getUri(); /** * Service metadata information, if we need to carry additional information to the service, @return The key/value pair metadata associated with The service instance. */ Map<String, String> getMetadata(); /** * @return The scheme of the service instance. */ default String getScheme() { return null; }}Copy the code
3, ServiceRegistry
ServiceRegistry is a ServiceRegistry interface used to register services with the registry.
Public interface ServiceRegistry<R extends Registration> {void register(R Registration); // void deregister(R registration); // Close void close(); Void setStatus(R registration, String status); <T> T registration (R registration); }Copy the code
How does Naocs implement this specification?
Through the above entry code, it can be found that there are three important beans, which also implement the above specification. Next, we will mainly analyze the functions of these three beans
NacosRegistration:
This class manages basic service data, such as the service name and service IP address. It implements the Registration and ServiceInstance interfaces provided by Spring-Cloud-Commons.
public class NacosRegistration implements Registration, ServiceInstance { public static final String MANAGEMENT_PORT = "management.port"; public static final String MANAGEMENT_CONTEXT_PATH = "management.context-path"; public static final String MANAGEMENT_ADDRESS = "management.address"; public static final String MANAGEMENT_ENDPOINT_BASE_PATH = "management.endpoints.web.base-path"; Private NacosDiscoveryProperties NacosDiscoveryProperties; private ApplicationContext context; public NacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { this.nacosDiscoveryProperties = nacosDiscoveryProperties; this.context = context; }... Omit}Copy the code
NacosServiceRegistry:
This class implements the Spring-Cloud-Commons ServiceRegistry interface, which encapsulates the configuration file as an Instance in the Register method. Call the namingService. RegisterInstance (serviceId, instance) method to the service registry registry.
public class NacosServiceRegistry implements ServiceRegistry<Registration> { private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class); private final NacosDiscoveryProperties nacosDiscoveryProperties; // This is used to communicate with the nacOS registry, such as registering services to the nacOS registry private final NamingService NamingService; public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) { this.nacosDiscoveryProperties = nacosDiscoveryProperties; this.namingService = nacosDiscoveryProperties.namingServiceInstance(); } @override public void register(Registration) {if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..." ); return; } String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); / / change Registration into the Instance Instance Instance = getNacosInstanceFromRegistration (Registration); Try {// Register the service to the registry // The bottom layer of this method still sends an HTTP request to complete the registration. Is actually using HttpClient sends a request to the server). NamingService registerInstance (serviceId, group, and the instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed... {},", serviceId, registration.toString(), e); // rethrow a RuntimeException if the registration is failed. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132 rethrowRuntimeException(e); }} // omit other methods}Copy the code
NacosAutoServiceRegistration:
Is used to trigger the service registration behavior.
See NacosAutoServiceRegistration source code can be found that NacosAutoServiceRegistration ApplicationListener interface is achieved, After the success of the project started will invoke the ApplicationListener onApplicationEvent (WebServerInitializedEvent event) method, This is where the serviceregistry.register (R registration) method is eventually called to register the service to the registry
So why is the onApplicationEvent method called? Because of the observer design pattern, which is spring’s event listener mechanism, classes that implement the ApplicationListener interface are interested in an event, and when the listener listens for the event, the onApplicationEvent method is fired
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, // Implements ApplicationListener as a listener, Interested in WebServerInitializedEvent event ApplicationListener < WebServerInitializedEvent > {/ / when listening to WebServerInitializedEvent will perform this method @ Override @ SuppressWarnings (" deprecation ") public void onApplicationEvent (WebServerInitializedEvent event) {/ / calls here bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); // call this.start(); } public void start() { if (! isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (! this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); // This will go through a chain of calls and drop into the register method register() in NacosServiceRegistry; if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); }}}Copy the code
So when WebServerInitializedEvent event is released? Let’s use breakpoint debug to observe:
2) Service registration details analysis
How does a client register a service with a server
The client
NacosServiceRegistry#register
NacosNamingService is created when NacosServiceRegistry is instantiated
public void registerInstance(String serviceName, String groupName, Throws NacosException {if (instance.isephemeral ()) {// Encapsulates heartbeat information BeatInfo BeatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); long instanceInterval = instance.getInstanceHeartBeatInterval(); beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval); . / / heart renewal method of enclosing beatReactor addBeatInfo (NamingUtils. GetGroupedName (serviceName, groupName), beatInfo); } / / service registration. ServerProxy. RegisterService (NamingUtils. GetGroupedName (serviceName, groupName), groupName, instance); } public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { Map<String, String> params = new HashMap(9); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); This.reqapi (utilandcoms. NACOS_URL_INSTANCE, params, (String)"POST"); } public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) { params.put("namespaceId", this.getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) { throw new IllegalArgumentException("no server available"); } else { Exception exception = new Exception(); if (servers ! = null && ! servers.isEmpty()) { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for(int i = 0; i < servers.size(); ++i) { String server = (String)servers.get(index); Return this. CallServer (API, params, server, method); } catch (NacosException var11) { exception = var11; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11); } catch (Exception var12) { exception = var12; LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12); } index = (index + 1) % servers.size(); }} else {// Retry mechanism int I = 0; while(i < 3) { try { return this.callServer(api, params, this.nacosDomain); } catch (Exception var13) { exception = var13; ++i; } } } } } public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0L; this.checkSignature(params); List<String> headers = this.builderHeaders(); String url; if (! curServer.startsWith("https://") && ! curServer.startsWith("http://")) { if (! curServer.contains(":")) { curServer = curServer + ":" + this.serverPort; } url = HttpClient.getPrefix() + curServer + api; } else { url = curServer + api; } // HttpResult result = httpClient. request(url, headers, params, "utF-8 ", method); end = System.currentTimeMillis(); MetricsMonitor .getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start)); if (200 == result.code) { return result.content; } else if (304 == result.code) { return ""; } else { } } public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0L; this.checkSignature(params); List<String> headers = this.builderHeaders(); String url; if (! curServer.startsWith("https://") && ! curServer.startsWith("http://")) { if (! curServer.contains(":")) { curServer = curServer + ":" + this.serverPort; } url = HttpClient.getPrefix() + curServer + api; } else { url = curServer + api; } // HttpResult result = httpClient. request(url, headers, params, "utF-8 ", method); end = System.currentTimeMillis(); MetricsMonitor .getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start)); if (200 == result.code) { return result.content; } else if (304 == result.code) { return ""; } else { } }Copy the code
The service side
Now let’s look at the server code, which is essentially a Controller
We just need to find the Controller that accepts the above request (InstanceController#register)
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, Action = actiontypes. WRITE) public String Register (HttpServletRequest Request) throws Exception {// Obtain the namespaceId from the request parameter final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); Final String serviceName = Webutils. required(request, commonparams.service_name); final String serviceName = Webutils. required(request, commonParams.service_name); @@service name checkServiceNameFormat(serviceName); Final Instance Instance = parseInstance(request); / / registered entrance serviceManager. RegisterInstance (namespaceId serviceName, instance); return "ok"; }Copy the code
1. Enter through the registration entrance
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //Map<namespaceId, Map<groupName@@serviceName, Service>> serviceMap // If serviceMap does not have this Service, CreateEmptyService (namespaceId, serviceName, instance.isephemeral ())); Service Service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isephemeral (), instance); }Copy the code
2.1 Enter the createEmptyService method
public void createEmptyService(String namespaceId, String serviceName,
boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
Copy the code
Three main things have been done:
-
Put the service into the serviceMap service registry
-
Initialize the service, create a health check task (mainline – health check code)
-
Adding a RecordListener to a queue executes the onChange method when it listens for certain events. Nacos makes a lot of use of the observer design pattern, such as instance registration and cull, which are abstracted into tasks in a blocking queue. When listening for a task comes in, The listener handles these tasks, executing the onChange method
Service: Is primarily concerned with instance changes
ServiceManager: The main concern is service changes
SwitchManager: Switch between AP model and CP model
Many open source frameworks use this kind of asynchronous task and memory queue operations extensively to improve performance. These operations themselves do not need to succeed immediately after the write, which can help greatly in this way
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); If (service == null) {loGGers.srv_log. info(" Creating Empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster ! = null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // Set the listener putServiceAndInit(service); // Set the listener putServiceAndInit(service); // related to persistence, if (! local) { addOrReplaceService(service); }} private void putServiceAndInit(Service Service) throws NacosException {Map<namespaceId, Map<groupName@@serviceName, Service>> serviceMap putService(service); // Initialize the service and create a health check task service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }Copy the code
2.2 Entering the addInstance method
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... Ips) throws NacosException {/ / service unique identifier String. Key = KeyBuilder buildInstanceListKey (namespaceId serviceName, ephemeral); Service Service = getService(namespaceId, serviceName); List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); synchronized (service) {List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); / / consistencyService default implementation is DelegateConsistencyServiceImpl class / / and DelegateConsistencyServiceImpl belongs to a kind of static proxy mode, according to whether the temporary node selection AP or CP models / / default is CP, The default implementation class for DistroConsistencyServiceImpl consistencyService. Put (key, the instances); }}Copy the code
We mainly analyze DistroConsistencyServiceImpl class, AP model
@DependsOn("ProtocolManager") @org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {// A notification task thread private volatile Notifier Notifier = new Notifier(); Private Map<String, ConcurrentLinkedQueue<RecordListener>> Listeners = new ConcurrentHashMap<>(); @postConstruct public void init() {// Make Notifier task run GlobalExecutor.submitDistroNotifyTask(notifier); @override public void put(String key, Record value) throws NacosException {onPut(key, value); Sync (new DistroKey(key, keyBuilder.instance_list_key_prefix), dataoperation.change, globalConfig.getTaskDispatchPeriod() / 2); } public void onPut(String key, Record value) {if (KeyBuilder matchEphemeralInstanceListKey (key)) {/ / Datum storage for instance list Datum < Instances > Datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); Put (key, datum); // The dataStore is synchronized with the cluster. } //Map<String, ConcurrentLinkedQueue<RecordListener>> listeners if (! listeners.containsKey(key)) { return; } // Add a change task notifier. AddTask (key, dataoperation.change); } public class implements Runnable {private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); Public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.offer(Pair.with(datumKey, action)); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ;) {try {//BlockingQueue<Pair<String, DataOperation>> Tasks <String, DataOperation>> DataOperation> pair = tasks.take(); // Handle (pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); Private void handle(Pair<String, DataOperation> Pair) {try {String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (! listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; Try {if (action == dataoperation.change) {// Execute the onChange method listener.onchange (datumKey, dataStore.get(datumKey).value); continue; } if (action == dataoperation.delete) {// Execute the listener onDelete method listener.ondelete (datumKey); continue; } } catch (Throwable e) { } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug(""); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}}}Copy the code
3. From the above analysis, the Service registration is done in the onChange method of the Service
// When listening to the registration task, @override public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getweight () > 10000.0d) {instance.setweight (10000.0d); } if (instance.getweight () < 0.01d && instance.getweight () > 0.0d) {instance.setweight (0.01d); }} / / registered instance update to the registry in memory updateIPs (value. GetInstanceList (), KeyBuilder. MatchEphemeralInstanceListKey (key)); recalculateChecksum(); }Copy the code
Cluster synchronization
Initialize full pull: Distro protocol nodes start up to fully synchronize data from other nodes (DistroLoadDataTask)
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) { this.memberManager = memberManager; this.distroComponentHolder = distroComponentHolder; this.distroTaskEngineHolder = distroTaskEngineHolder; this.distroConfig = distroConfig; StartVerifyTask (); } private void startVerifyTask() { DistroCallback loadCallback = new DistroCallback() { @Override public void onSuccess() { loadCompleted = true; } @Override public void onFailed(Throwable throwable) { loadCompleted = false; }}; GlobalExecutor.schedulePartitionDataTimedSync( new DistroVerifyTask(memberManager, distroComponentHolder), distroConfig.getVerifyIntervalMillis()); / / open DistroLoadDataTask thread calls the load () method to load data GlobalExecutor. SubmitLoadDataTask (new DistroLoadDataTask (memberManager, distroComponentHolder, distroConfig, loadCallback)); } public class DistroLoadDataTask implements Runnable {@override public void run() {try {// implements Runnable; if (! checkCompleted()) { GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis()); } else { loadCallback.onSuccess(); Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success"); } } catch (Exception e) { loadCallback.onFailed(e); Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e); } } private void load() throws Exception { while (memberManager.allMembersWithoutSelf().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init..." ); TimeUnit.SECONDS.sleep(1); } while (distroComponentHolder.getDataStorageTypes().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register..." ); TimeUnit.SECONDS.sleep(1); } for (String each : distroComponentHolder.getDataStorageTypes()) { if (! loadCompletedMap.containsKey(each) || ! LoadCompletedMap. Get (each)) {/ / real load data method loadCompletedMap. Put (each, loadAllDataSnapshotFromRemote (each)); }}} / / from the remote machine synchronize all data private Boolean loadAllDataSnapshotFromRemote (String resourceType) {DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType); DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType); if (null == transportAgent || null == dataProcessor) { return false; } for (Member each : memberManager.allMembersWithoutSelf()) { try { DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress()); boolean result = dataProcessor.processSnapshot(distroData); if (result) { return true; } } catch (Exception e) { } } return false; }}Copy the code
Incremental pull: From the above analysis, we found that an onPut method is called during service registration, followed by the following methods for cluster synchronization
Public void sync(DistroKey DistroKey, DataOperation action, long delay) {// Broadcast for (Member each: memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),each.getAddress()); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); / / by publishing distroTaskEngineHolder delay task distroTaskEngineHolder getDelayTaskExecuteEngine () addTask (distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); }}}Copy the code
The heartbeat to renew
Through the above service registration source code analysis found that when the service registration will open a scheduled task of heartbeat renewal
Client code
class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } // Long result = serverproxy.sendBeat (beatInfo); long nextTime = result > 0 ? result : beatInfo.getPeriod(); //schedule The API for the scheduled task is called only once, Executorservice.schedule (new BeatTask(beatInfo), nextTime, timeunit.milliseconds); } } public long sendBeat(BeatInfo beatInfo) { try { if (NAMING_LOGGER.isDebugEnabled()) { ...... } Map<String, String> params = new HashMap<String, String>(4); params.put("beat", JSON.toJSONString(beatInfo)); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); String result = reqAPI(utilandcoms. NACOS_URL_BASE + "/instance/beat", params, httpmethod.put); JSONObject jsonObject = JSON.parseObject(result); if (jsonObject ! = null) { return jsonObject.getLong("clientBeatInterval"); } } catch (Exception e) { ... } return 0L; }Copy the code
So how does the server handle heartbeat renewal?
Server code
: InstanceController# beat
@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = webutils. optional(request, "beat", stringutils.empty); RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } // Obtain the clusterName value from the request parameter String clusterName = webutils. optional(request, commonparams.cluster_name, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String IP = webutils. optional(request, "IP ", stringutils.empty); Int port = integer.parseint (webutils. optional(request, "port", "0")); int port = integer.parseint (webutils. optional(request, "port", "0")); if (clientBeat ! = null) {if (stringutils.isnotBlank (clientbeat.getCluster ())) {//clusterName DEFAULT clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } // Namespace default public String namespaceId = webutils. optional(request, commonparams.namespace_id, Constants.DEFAULT_NAMESPACE_ID); @@service name String serviceName = webutils. required(request, commonparams.service_name); // serviceName in the format of group name. CheckServiceNameFormat (serviceName); // Check whether the serviceName format is specified in nacOS to avoid sending an incorrect heartbeat request to the Server. / / for Instance information Instance Instance. = serviceManager getInstance (namespaceId serviceName, clusterName, IP, port); if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); // If an instance does not exist, register a new instance from clientBeat. Or the server restart service instance leakage problems of temporary node serviceManager. RegisterInstance (namespaceId serviceName, instance); } // The data model of Nacos is service --> cluster --> instance serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } / / open a heartbeat renewal of thread, update the client instance last heartbeat time service. ProcessClientBeat (clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; } public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); } public class ClientBeatProcessor implements Runnable { ... @Override public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } // getIp, port, cluster, and all instances in the cluster String IP = rsinfo.getip (); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } // Set the last heartbeat time instance.setlastBeat (system.currentTimemillis ()); if (! instance.isMarked()) { if (! instance.isHealthy()) { instance.setHealthy(true); getPushService().serviceChanged(service); } } } } } }Copy the code
Health check
By analyzing the code for automatic registration, a health check task (ServiceManager#putServiceAndInit) is started when the service is first registered.
Private void putServiceAndInit(Service Service) throws NacosException {// Put the Service into serviceMap Map<namespaceId, Map<groupName@@serviceName, Service>> serviceMap putService(service); // Initialize the service and create a health check task service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey( service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey( service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); } public void init() {// Create a health check task // If an instance does not receive a heartbeat for more than 15 seconds, its healthy property is set to false // If an instance does not receive a heartbeat for more than 30 seconds, Directly eliminate the instance HealthCheckReactor. ScheduleCheck (clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); }}Copy the code
ClientBeatCheckTask: This is the health check task
public class ClientBeatCheckTask implements Runnable { @Override public void run() { try { if (! getDistroMapper().responsible(service.getName())) { return; } if (! getSwitchDomain().isHealthCheckEnabled()) { return; } List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : If (system.currentTimemillis () -instance.getLastBeat () > instance.getInstanceHeartBeatTimeOut()) { if (! Instance.ismarked ()) {if (instance.ishealthy ()) {// Modify the instance health status instance.sethealthy (false); // Issue a ServiceChangeEvent event, and the onApplicationEvent method in the PushService listener will execute getPushService().servicechanged (service); / / release an event ApplicationUtils. PublishEvent (new InstanceHeartbeatTimeoutEvent (this, the instance)); } } } } if (! getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } / / whether the last heartbeat time more than 30 s the if (System. CurrentTimeMillis () - instance. GetLastBeat () > instance. GetIpDeleteTimeout ()) {/ / delete the instance deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); }}}Copy the code
Service to eliminate
When the last heartbeat time exceeds 30 seconds, the service culling method is performed
private void deleteIp(Instance instance) { try { NamingProxy.Request request = NamingProxy.Request.newRequest(); request.appendParam("ip", instance.getIp()) .appendParam("port", String.valueOf(instance.getPort())) .appendParam("ephemeral", "true") .appendParam("clusterName", instance.getClusterName()) .appendParam("serviceName", service.getName()) .appendParam("namespaceId", service.getNamespaceId()); The String url = "http://127.0.0.1:" + ApplicationUtils. GetPort () + ApplicationUtils. GetContextPath () + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl(); / / send out request HttpClient asyncHttpDelete (url, null, null, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (! result.ok()) { ... } } @Override public void onError(Throwable throwable) { ... } @Override public void onCancel() { ... }}); } catch (Exception e) { } } @CanDistro @DeleteMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String deregister(HttpServletRequest request) throws Exception { Instance instance = getIpAddress(request); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); checkServiceNameFormat(serviceName); Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName); return "ok"; } / / removing instance serviceManager. This (namespaceId serviceName, instance. IsEphemeral (), the instance); return "ok"; }Copy the code