1. Source code compilation
1.1 Download Source Code
Select Tag 1.4.1 from Github to download. nacos
1.2 build
Open the project with Idea and compile the entire project. Compile the nacos-consistency 1.4.1 directory, otherwise an error will be reported.
1.3 Single-node Startup
Adding VM Parameters
-Dnacos.standalone=true
Copy the code
And you’re done.
2. Core functions of Nacos
- Service registration: The Nacos Client registers its services with the Nacos Server by sending REST requests, providing its own metadata, such as IP addresses and ports. When Nacos Server receives the registration request, it stores the metadata information in a two-tier memory Map.
- Service heartbeat: After a service is registered, the Nacos Client maintains a timed heartbeat to continuously notify the Nacos Server that the service is always available and prevents it from being rejected. By default, heartbeat is sent every 5 seconds.
- Service health check: Nacos Server starts a scheduled task to check the health of registered service instances, sets its healthy attribute to false for instances that have not received a client heartbeat for more than 15 seconds (the client service does not detect it), and if an instance has not received a heartbeat for more than 30 seconds, Reject the instance directly (the deleted instance is re-registered if the heartbeat is resumed)
- Service discovery: When the service consumer (Nacos Client) invokes the service provider’s service, it sends a REST request to the Nacos Server to obtain the list of services registered above, which is cached locally in the Nacos Client. At the same time, a scheduled task is started in Nacos Client to periodically pull the server’s latest registry information to the local cache
- Service synchronization: Nacos Server clusters synchronize service instances with each other to ensure the consistency of service information.
3. The big picture
Before we look at the source code, let’s think about it a little bit. If we were to write our own Nacos, it would contain several modules.
- Nacos server: Provides service registration, service discovery, heartbeat tasks, service synchronization, and service health check functions.
- Nacos client: Provides a call method for each microservice to communicate with the Nacos server.
So how does the Nacos client communicate with the server, which is nothing more than HTTP calls, so the Nacos server needs to provide the call interface.
4. Source code entry
Referring to the previous article on the use of Nacos, Nacos, we found that the dependency file contains the following dependency.
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
Copy the code
The starter of nacOS is a starter that uses the SpingBoot autowage principle. If you’ve done SpringBoot autowage, you should know that there will be a Spring. factories file with the actual calling classes in it.
Okay, so we’ve got this file, look at the contents of that file.
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
Copy the code
EnableAutoConfiguration this is the auto-assembly Key, and the value behind it is the actual calling class. Here, look at the first class NacosDiscoveryAutoConfiguration directly.
4.1 NacosDiscoveryAutoConfiguration
The class has three beans, the last of which is the core Bean.
@Configuration @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class }) public class NacosDiscoveryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }}Copy the code
4.2 NacosAutoServiceRegistration
4.2.1 Construction method
Here the parent constructor is called again
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
Copy the code
4.3 AbstractAutoServiceRegistration
4.3.1 Construction method
The constructor of this class does nothing but assign nacosService stry to Service stry
protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry,
AutoServiceRegistrationProperties properties) {
this.serviceRegistry = serviceRegistry;
this.properties = properties;
}
Copy the code
But as you can see from the class diagram, this class implements Spring’s event listener interface, which means that when the Spring container starts, the onApplicationEvent method of this class is executed.
4.3.2 onApplicationEvent
Here we call the bind method again.
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
Copy the code
4.3.3 the bind ()
Look at the source code there is a skill is encountered if judgment and then directly return, you can not first, because the probability is not the main process.
@ Deprecated public void bind (WebServerInitializedEvent event) {/ / access application context ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; This.port.com pareAndSet(0, event.getwebServer ().getPort())); // Execute the startup method this.start(); }Copy the code
4.3.4 the start ()
Go straight to the core method register()
public void start() { if (! isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (! this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); }}Copy the code
4.3.5 register ()
This is where the register method of the previous NacosServiceRegistry is called.
protected void register() {
this.serviceRegistry.register(getRegistration());
}
Copy the code
4.4 NacosServiceRegistry
4.4.1 the register ()
Here you can see from the printed log that after executing the registerInstance method, the service is registered, so the source entry function has been found.
public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..." ); return; } / / access service ID String serviceId = registration. GetServiceId (); Instance / / access to services, including service IP, port, weight, name of the cluster and the metadata Instance Instance = getNacosInstanceFromRegistration (registration); Try {/ / perform registration logic namingService. RegisterInstance (serviceId, instance); // As you can see from the log, a microservice is registered after the above method is executed. Log.info ("nacos registry, {} {}:{} register finished", serviceId, instance.getip (), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed... {},", serviceId, registration.toString(), e); }}Copy the code
So far, the source code entry function has been found. So far, all the code is the Nacos client code, that is, the code provided to the microservice, and does not involve the Nacos server code.
5. Service registration
5.1 Nacos Client side
5.1.1 NacosNamingService
Look at the registerInstance method of this class
@Override public void registerInstance(String serviceName, String groupName, Throws NacosException {// Determine whether the Instance is temporary if (instance.isephemeral ()) {// Set the heartbeat information BeatInfo BeatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); long instanceInterval = instance.getInstanceHeartBeatInterval(); beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval); beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } / / call the proxy class registration service serverProxy. RegisterService (NamingUtils. GetGroupedName (serviceName, groupName), groupName, instance); }Copy the code
Here is a direct look at the register instance code, if is the heartbeat service code, not for now.
5.1.2 NamingProxy
This is to send a request to the Nacos server, the address of the request is “/ Nacos /v1/ns/instance”, type is POST, to register the instance, the underlying use of HttpClient.
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
Copy the code
Let’s take a look at how the Nacos server handles this request.
5.2 Nacos Server side
For this HTTP request, there must be a Controller on the Server to receive it, so let’s look at InstanceController.
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
Copy the code
All the processing logic for the instance is in this Controller.
5.2.1 InstanceController. The register ()
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, Action = actiontypes. WRITE) public String Register (HttpServletRequest Request) throws Exception {// Obtain namespaceId final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); Final String serviceName = Webutils. required(request, commonparams.service_name); final String serviceName = Webutils. required(Request, commonParams.service_name); / / check service name cannot be empty NamingUtils. CheckServiceNameFormat (serviceName); Final Instance instance = parseInstance(request); / / registered instance serviceManager. RegisterInstance (namespaceId serviceName, instance); return "ok"; }Copy the code
5.2.2 ServiceManager
This class is the concrete processing class, which you can think of as the XXXService class when you write code.
5.2.2.1 registerInstance ()
The logic is very clear, Nacos source code, many methods are no more than a screen of Idea, looking very refreshing. This method creates the service instance.
Public void registerInstance(String namespaceId, String serviceName, Instance Instance) throws NacosException {// Create service, CreateEmptyService (namespaceId, serviceName, instance.isephemeral ()); Service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // Add service instance addInstance(namespaceId, serviceName, instance.isephemeral (), instance); }Copy the code
5.2.2.2 createServiceIfAbsent
The createEmptyService method calls createServiceIfAbsent directly.
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
Copy the code
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster Cluster) throws NacosException {// Obtaining the Service from the cache Service Service = getService(namespaceId, serviceName); // Since this is the first time to register, If (service == null) {loGGers.srv_log. info(" Creating Empty Service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); If (cluster! = null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // Put the service and initialize putServiceAndInit(service); if (! local) { addOrReplaceService(service); }}}Copy the code
5.2.2.3 putServiceAndInit
Private void putServiceAndInit(Service Service) throws NacosException {// Put the Service into the cache putService(Service); // Initialize the service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }Copy the code
5.2.2.4 putService ()
The DCL goes into the cached data structure
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
Copy the code
public void putService(Service service) { if (! serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (! serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>()); } } } serviceMap.get(service.getNamespaceId()).put(service.getName(), service); }Copy the code
5.2.2.5 init ()
Here is the logic of health check, below analysis.
public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); For (map.entry <String, Cluster> Entry: Clustermap.entrySet ()) {entry.getValue().setService(this); entry.getValue().init(); }}Copy the code
5.2.2.6 addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... Ips) throws NacosException {/ / create instances of temporary Key String Key = KeyBuilder. BuildInstanceListKey (namespaceId serviceName, ephemeral); Service = getService(namespaceId, serviceName); Synchronized (service) {List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); synchronized (service) {List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); / / call DelegateConsistencyServiceImpl. The put method consistencyService. Put (key, the instances); }}Copy the code
5.3 DelegateConsistencyServiceImpl
5.3.1 put
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
Copy the code
As Distro is a temporary instance, it will follow The Distro protocol, the AP mode, that Ali implemented himself.
5.4 DistroConsistencyServiceImpl
5.4.1 the put
public void put(String key, Record value) throws NacosException {
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
Copy the code
5.4.2 for onPut
Here notifier. AddTask is called, where notifier is a Runnable
public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (! listeners.containsKey(key)) { return; } notifier.addTask(key, DataOperation.CHANGE); }Copy the code
5.4.3 notifier. AddTask
Here tasks are a blocking queue, and here tasks are placed in a blocking queue.
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
Copy the code
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
Copy the code
So when do you get this task? Since Notifier is a Runnable, its run method must be executed.
5.4.4 notifier. Run
You can see that in the run method, we execute an endless loop, constantly fetching tasks from the queue and blocking if there are no tasks.
@Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ;) { try { Pair<String, DataOperation> pair = tasks.take(); // Handle task (pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}}Copy the code
So when do you execute the run method?
5.4.5 init
As you can see, after executing the constructor, the init() method is executed and a new Notifier task is added to the thread pool.
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
Copy the code
5.4.6 handle (pair)
private void handle(Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); Services. remove(datumKey); // Remove the current service from the cache. int count = 0; if (! listeners.containsKey(datumKey)) { return; } // loop for (RecordListener listener: listeners. Get (datumKey)) {count++; Try {// Update action takes update logic, If (action == dataoperation.change) {// Call service. onChange listener.onChange(datumKey, dataStore.get(datumKey).value); continue; If (action == dataoperation.delete) {listener.ondelete (datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}Copy the code
5.5 the Service
5.5.1 onChange
public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); // loop for Instance (Instance Instance: value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getweight () > 10000.0d) {instance.setweight (10000.0d); } if (instance.getweight () < 0.01d && instance.getweight () > 0.0d) {instance.setweight (0.01d); }} / / update the instance list, here named inaccurate updateIPs (value. GetInstanceList (), KeyBuilder. MatchEphemeralInstanceListKey (key)); recalculateChecksum(); }Copy the code
5.5.2 updateIPs
So let’s go straight to this paragraph here and update the list of instances
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
Copy the code
5.6 Cluster
5.6.1 updateIps
This uses the idea of CopyOnWrite, making a copy while writing.
public void updateIps(List<Instance> ips, boolean ephemeral) { Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (! ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() ! = oldIP.isHealthy()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName()); } if (ip.getWeight() ! = oldIP.getWeight()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString()); } } } List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { HealthCheckStatus.reset(ip); } } List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; }}Copy the code
5.7 PushService
After the instance is updated, it will be actively pushed to the client. Here, Nacos is actively pushed. When the service list of the registry is updated, it will be directly pushed to the client.
This class still implements the ApplicationListener interface, so the onApplicationEvent method is still executed
5.7.1 onApplicationEvent
The method is too long to be completely posted, just look at this line, this is UDP sent, not TCP sent to Zookeeper.
udpPush(ackEntry);
Copy the code
The service registration process is complete.
6. Service heartbeat
Before we look at the code, let’s think about what the heartbeat service does. First, the heartbeat service is used to check whether the client is still alive. The connection between the server and client cannot be maintained all the time. Therefore, the client periodically sends the heartbeat service to the server to prove that the service is still alive.
When we looked at the client code, a heartbeat service was sent to the server before the instance was registered.
6.1 the client
6.1.1 NacosNamingService.registerInstance
Public void registerInstance(String serviceName, String groupName, Instance Instance) throws NacosException {// Temporary Instance, If (instance.isephemeral ()) {BeatInfo BeatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); long instanceInterval = instance.getInstanceHeartBeatInterval(); SetPeriod (instanceInterval == 0?) // Set the heartbeat interval. The default interval is 5 seconds. DEFAULT_HEART_BEAT_INTERVAL : instanceInterval); . / / the heart request beatReactor addBeatInfo (NamingUtils getGroupedName (serviceName, groupName), beatInfo); } serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }Copy the code
6.1.2 BeatReactor.addBeatInfo
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo); Executorservice.schedule (new BeatTask(beatInfo), 0, timeunit.milliseconds); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }Copy the code
Also 6.1.3 BeatTask. Run
Execute the BeatTask task in the run() method as long as the client can send heartbeat instructions to the server.
public void run() { if (beatInfo.isStopped()) { return; } // Long result = serverProxy.sendBeat(beatInfo); Long nextTime = result > 0? result : beatInfo.getPeriod(); Executorservice.schedule (new BeatTask(beatInfo), nextTime, timeunit.milliseconds); }Copy the code
Let’s look at the server-side code
6.2 the service side
6.2.1 InstanceController. Beat
The code is too long. Here’s the process.
- The parameters are first parsed from the request value
- Build parameters
- Get Instance, register without it
- Get service instance, register if not
- Service. ProcessClientBeat (clientBeat) specific processing logic
6.2.2 Service. ProcessClientBeat
public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); . / / clientBeatProcessor mission HealthCheckReactor scheduleNow (clientBeatProcessor); }Copy the code
6.2.3 ClientBeatProcessor. Run
This is the last heartbeat time to update the client instance
public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } instance.setLastBeat(System.currentTimeMillis()); // Since the client sent a heartbeat request, the unhealthy service is updated to a healthy if (! instance.isMarked()) { if (! instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); // Re-push health service getPushService().servicechanged (service) to client; } } } } }Copy the code
7. Service health check
There is a service.init () method to register the Service, which contains the logic for the Service health check
Public void init () {/ / submission service health inspection task HealthCheckReactor scheduleCheck (clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); }}Copy the code
7.1 ClientBeatCheckTask
7.1.1 the run ()
public void run() { try { if (! getDistroMapper().responsible(service.getName())) { return; } if (! getSwitchDomain().isHealthCheckEnabled()) { return; } List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : Instances) {if (system.currentTimemillis () -instance.getLastBeat () > instance.getInstanceHeartBeatTimeOut()) { if (! Instance.ismarked ()) {if (instance.ishealthy ()) {// Service unhealthy instance.sethealthy (false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); // Push to client getPushService().servicechanged (service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (! getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } / / service more than 30 seconds if no response (System. CurrentTimeMillis () - instance. GetLastBeat () > instance. GetIpDeleteTimeout ()) {/ / delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); // Call the delete interface to delete the service deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); }}Copy the code
8. Service discovery
The service discovery is obtained on the server side according to the service name when the service interface is called for the first time. See the source code for this. Here is the underlying logic.
8.1 the client
8.1.1 NacosNamingService.getAllInstances
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; If (subscribe) {/ / access service list serviceInfo = hostReactor getServiceInfo (NamingUtils. GetGroupedName (serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } List<Instance> list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList<Instance>(); } return list; }Copy the code
8.1.2 HostReactor
8.1.2.1 getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // Obtain the service object ServiceInfo serviceObj = getServiceInfo0(serviceName, Clusters); If (null == serviceObj) {// Build the service object serviceObj = new ServiceInfo(serviceName, Clusters); Put (serviceobj.getKey (), serviceObj); // Add the service cache Map ServiceInfomap. put(serviceobj.getKey (), serviceObj); Updatingmap. put(serviceName, new Object()); // Update the service updateServiceNow(serviceName, Clusters); // Remove updatingmap. remove(serviceName) from the cache; } else if (updatingmap.containsKey (serviceName)) {// If (UPDATE_HOLD_INTERVAL > 0) {// Synchronize synchronized ServiceObj (UPDATE_HOLD_INTERVAL) {// Wait for 5 seconds. } catch (InterruptedException e) { NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); }}}} // Perform this operation in the thread pool, and periodically update the client's service cache scheduleUpdateIfAbsent(serviceName, Clusters). Return serviceInfomap. get(serviceobj.getKey ()); return serviceInfomap.get (serviceobj.getKey ()); }Copy the code
8.1.2.2 updateServiceNow
public void updateServiceNow(String serviceName, {// Obtain the service ServiceInfo oldService = getServiceInfo0(serviceName, clusters); String result = ServerProxy.queryList (serviceName, Clusters, PushReceiver.getUdPPort (), false); // Obtain the service list from the server. if (StringUtils.isNotEmpty(result)) { processServiceJSON(result); } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } finally { if (oldService ! = null) { synchronized (oldService) { oldService.notifyAll(); }}}}Copy the code
8.1.2.3 scheduleUpdateIfAbsent
Create the UpdateTask execution
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) ! = null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) ! = null) { return; } ScheduledFuture<? > future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); }}Copy the code
8.1.2.4 UpdateTask. Run
public void run() { try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); If (serviceObj == null) {updateServiceNow(serviceName, Clusters); executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS); return; } / / service is not the latest, just back from the server to get the if (serviceObj. GetLastRefTime () < = lastRefTime) {updateServiceNow (serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else {// If the service has already been actively pushed by the server, this should not be overwritten. (This code feels useless because it only makes a request and does not update any data locally.) refreshOnly(serviceName, Clusters); } / / in the thread pool mission executor. The schedule (this, serviceObj getCacheMillis (), TimeUnit. MILLISECONDS); lastRefTime = serviceObj.getLastRefTime(); } catch (Throwable e) { NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); }}Copy the code
8.2 the service side
8.2.1 InstanceController. List
The first step is to obtain and construct parameters from the request value, and the key logic is in the doSrvIpxt method. (Teasing the naming…)
8.2.2 doSrvIpxt ()
This method simply gets all the information about the service from the instance.
8.2.3 service. SrvIPs
Get all instances from the cluster
8.2.4 allIPs
public List<Instance> allIPs(List<String> clusters) { List<Instance> result = new ArrayList<>(); for (String cluster : clusters) { Cluster clusterObj = clusterMap.get(cluster); if (clusterObj == null) { continue; } result.addall (clusterobj.allips ()); } return result; }Copy the code
8.2.5 clusterObj. AllIPs ()
Get all instances from registered information.
public List<Instance> allIPs() {
List<Instance> allInstances = new ArrayList<>();
allInstances.addAll(persistentInstances);
allInstances.addAll(ephemeralInstances);
return allInstances;
}
Copy the code
At this point, the stand-alone version of the Nacos registry source code analysis is complete, finally attached a whole flow chart.