Wechat official account: Operation and maintenance development story, author: Zheng Ge
In this paper, we will analyze the service registration process of Nacos as a service registry from the dimension of source code. I will analyze it from the perspectives of server and client. For Nacos client, I mainly use Spring-Cloud-Alibaba as the core client component. For Nacos server, I will explain how Nacos implements both AP/CP modes and how to distinguish them. Finally, I’ll share a bit of my experience with locating core classes during source debugging.
Let me first make a brief introduction to my environment:
-
Jdk 1.8
-
Nacos server -- 1.4.2
-
Spring - the boot - 2.3.5. RELEASE
-
spring-cloud-Hoxton.SR8
-
Spring - cloiud alibab -- 2.2.5. RELEASE
Nacos service architecture
Spring-boot is used as the service foundation to build the platform. The position of Nacos in the service architecture is shown in the following figure:
In general, middleware with similar functions to Nacos include Eureka, Zookeeper, Consul, Etcd, etc. One of the biggest features of Nacos is its ability to support both AP and CP patterns, using Raft protocol for partition consistency.
Nacos client
Service registration client
Add the dependent
Nacos service registration is initiated by the client and extended by using Spring startup completion event to invoke service registration method. First we need to import the spring-cloud-starter-Alibaba-nacos-discovery dependency:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
Copy the code
Analysis of the source code
For the spring-boot component we first look for the meta-INF/spring.Factories file
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
Copy the code
Through the analysis of I found NacosServiceRegistryAutoConfiguration is the core of our service registry configuration class, the class defines three core Bean object:
-
NacosServiceRegistry
-
NacosRegistration
-
NacosAutoServiceRegistration
NacosAutoServiceRegistration
NacosAutoServiceRegistration realize the function of service for registered Nacos, AbstractAutoServiceRegistration it inherits from the abstract class.
In abstract classes AbstractAutoServiceRegistration ApplicationContextAware, ApplicationListener < WebServerInitializedEvent > interface. The onApplicationEvent method is called when the container is started and the context is ready.
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
Copy the code
Call bind(event) :
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
Copy the code
The start() method is then called
public void start() { if (! isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (! this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); }}Copy the code
Finally, register() is called; Internally call the Serviceregistry.register () method to complete the service registration.
private final ServiceRegistry<R> serviceRegistry;
protected void register() {
this.serviceRegistry.register(getRegistration());
}
Copy the code
NacosServiceRegistry
The main purpose of the NacosServiceRegistry class is to implement service registration
public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..." ); return; } // By default, Will return by reflecting a ` com. Alibaba. Nacos. Client. Naming. NacosNamingService ` instance NamingService NamingService = NamingService (); / / get serviceId, default configuration: spring. Application. The name String serviceId = registration. GetServiceId (); / / obtain group, the default DEFAULT_GROUP String group = nacosDiscoveryProperties. GetGroup (); / / create the instance to instance instance = getNacosInstanceFromRegistration (registration); Try {/ / registered instance namingService. RegisterInstance (serviceId, group, and the instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed... {},", serviceId, registration.toString(), e); // rethrow a RuntimeException if the registration is failed. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132 rethrowRuntimeException(e); }}Copy the code
We can see the call is namingService registerInstance (serviceId, group, and the instance); Methods.
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}
Copy the code
Then call serverProxy. RegisterService (groupedServiceName, groupName, instance); The beatreactor.addBeatInfo () method creates a schedule to send heartbeat data to the server every 5s
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); / / POST/nacos/v1 / ns/instance service registry reqApi (UtilAndComs nacosUrlInstance, params, HttpMethod. POST). }Copy the code
The service registers the server
Nacos as a service registry can implement both AP and CP architecture. To maintain the service list of our service center. Here is a simple data model for our list of services:
In fact, it is the same process as nacosService registry to build Instance instances. Returning to our source code analysis, let’s look directly at the server side /nacos/v1/ns/instance interface, which is defined in the InstanceController#register method.
The service registry
In InstanceController# register method, the main is to parse the request parameters and then call serviceManager. RegisterInstance, if return ok registered successfully.
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
Copy the code
Call to the registerInstance method
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
Copy the code
Call the addInstance() method
@Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); }}Copy the code
Call consistencyService. Put (key, the instances); Refresh all instances in service. Through the definition of consistencyService can we know whether it will call the DelegateConsistencyServiceImpl class put method. There is an AP/CP mode option at this point that we can go through
@Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } // Selection of AP or CP mode: Distro protocol is used in AP mode and Raft protocol is used in CP mode. private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }Copy the code
AP mode
Nacos defaults to the AP mode implemented using Distro protocol. Implementation of the interface is EphemeralConsistencyService persistence of node information is mainly call the put method
@override public void put(String key, Record value) throws NacosException {// Data persistence onPut(key, value); Sync (new DistroKey(key, keyBuilder.instance_list_key_prefix), dataoperation.change, globalConfig.getTaskDispatchPeriod() / 2); }Copy the code
Call doPut to save data and send notifications
public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); // Data persistence datastore. put(key, datum); } if (! listeners.containsKey(key)) { return; } notifier.addTask(key, DataOperation.CHANGE); }Copy the code
Tasks. Offer (Pair. With (datumKey, action)); Puts registered instance information into the blocking queue Tasks. The Notifier#run method is used for asynchronous operations to ensure efficiency
public class Notifier implements Runnable { @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ;) { try { Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}} private void handle(Pair<String, DataOperation> Pair) {// listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); }}}}Copy the code
OnChange (datumKey, datastore.get (datumKey).value); Our listener is actually our Service object.
public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getweight () > 10000.0d) {instance.setweight (10000.0d); } if (instance.getweight () < 0.01d && instance.getweight () > 0.0d) {instance.setweight (0.01d); } } updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }Copy the code
The updateIPs method updates the service instance information to memory in the registry and notifies subscribers of the current service over UDP.
public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (! clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); Clustermap.get (entry.getKey()).updateips (entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); // Push service subscriber message getPushService().servicechanged (this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); }Copy the code
The CP mode
Nacos is implemented using Raft protocol using CP mode by default. The implementation class is PersistentConsistencyServiceDelegateImpl
So first let’s look at his put method
public void put(String key, Record value) throws NacosException { checkIsStopWork(); try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); }}Copy the code
Call raftCore. SignalPublish (key, value); The main steps are as follows
-
Determine whether it is the Leader node. If not, the Leader node forwards the request to the Leader node for processing.
-
OnPublish (datum, peers. Local ()); Raftstore.updateterm (local.term.get())); Methods persisted to the file, and then through NotifyCenter. PublishEvent (ValueChangeEvent. Builder (). The key (datum. The key). The action (DataOperation. CHANGE). The build ()); Asynchronously update to memory;
-
CountDownLatch(peers. MajorityCount ()) returns success only when the number of successful nodes is greater than N/2 + 1.
-
Call /raft/datum/ COMMIT of other Nacos nodes to synchronize instance information.
public void signalPublish(String key, Record value) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } if (! isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } OPERATE_LOCK.lock(); try { final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); onPublish(datum, peers.local()); final String content = json.toString(); final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { latch.countDown(); continue; } final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (! result.ok()) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, result.getCode()); return; } latch.countDown(); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable); } @Override public void onCancel() { } }); } if (! latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { // only majority servers return success can we consider this update success Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); }}Copy the code
Determine the AP mode or CP mode
If the client node registering NACOS is ephemeral=true, then the effect of the NACOS cluster on the client node is distro’s adoption of AP, When the client node registering nacOS is ephemeral=false, the effect of the NACOS cluster on this node is to adopt RAFT for CP. According to the attributes of the client during registration, the AP and CP are mixed at the same time, but the effect is different for different client nodes
Nacos source debugging
Nacos startup file
First we need to find the startup class of Nacos, first we need to find the jar to start.
Then we unzip target/nacos-server.jar
Decompression command:
Cat meta-inf/manifest.mfmanifest-version: tar -zxvf nacos-server.jar 1.0 implementation-title: nacos-console 1.4.2 implementation-version: 1.4.2 Archiver-version: Plexus Archiverbuilt-in: xiweng.yySpring-Boot-Layers-Index: BOOT-INF/layers.idxSpecification-Vendor: Alibaba GroupSpecification-Title: Nacos - the console 1.4.2 Implementation - Vendor - Id: com. Alibaba. NacosSpring - the Boot - Version: 2.5.0 RC1Implementation - Vendor: Alibaba GroupMain-Class: org.springframework.boot.loader.PropertiesLauncherSpring-Boot-Classpath-Index: BOOT-INF/classpath.idxStart-Class: com.alibaba.nacos.NacosSpring-Boot-Classes: BOOT-INF/classes/Spring-Boot-Lib: Boots-inf /lib/ created-by: Apache Maven 3.6.3 build-JDK: 1.8.0_231specification -version: 1.4.2Copy the code
This Class is the startup Class of the spring-boot project, com.baba.nacos.nacos
Nacos debugging
Through the startup class of com.alibaba.nacos.Nacos, we can start and debug in Idea through this class.
Refer to the link
nacos.io
Github.com/alibaba/nac…