General documentation: Article directory Github: github.com/black-ant

A. The preface

Hydrology, hahaha hahaha ~~~~~~~~~~~~

Article purpose:

  • Troubleshoot the Debug direction of the Nacos creation function
  • Sort out the module system of Nacos components

Article outline: This document mainly involves the following main parts

  • Nacos service discovery
  • Nacos configuration loading
  • Nacos health check
  • Nacos Routing policy

PS: The reference source of the document isThe official documentation, it is recommended to read the documentation for quick use

2. Compilation of source code

2.1 Nacos source code compilation

Step 1: Download the source code
https://github.com/alibaba/nacos.git

// Step 2: Compile Nacos
mvn -Prelease-nacos -Dmaven.test.skip=true clean install -U  
    
// Step 3: Run the Server file
nacos_code\distribution\target    

Copy the code

2.2 Nacos source code running

Step 1: Download the Nacos source code
https://github.com/alibaba/nacos.git

// Step 2: Import Nacos from IDEAAdd SpringBoot here, starting with the com.alibaba.nacos.nacos class// Step 3: IDEA Modify startup parameters
-Dnacos.standalone=true -Dnacos.home=C:\\nacos
    
-nacos.standalone=true: single-server startup -dnacos. home=C:\ nacos: Log path// PS : Nacos Application
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {
    
    public static void main(String[] args) { SpringApplication.run(Nacos.class, args); }}Copy the code

3. Module source code

<modules>
	<! -- Configuration Management -->
	<module>config</module>
        <! -- Nacos kernel -->
	<module>core</module>
	<! -- Service Discovery -->
	<module>naming</module>
        <! -- address server --> 
	<module>address</module>
	<! -- Unit test -->
	<module>test</module>
        <! -- Interface abstraction -->
	<module>api</module>
        <! -- Client -->
	<module>client</module>
	<! - case - >
	<module>example</module>
        <! -- Public Tools -->
	<module>common</module>
        <! -- Server build publishing -->
	<module>distribution</module>
        <! -- Console, Graphical Interface module -->
	<module>console</module>
         <! -- Metadata Management -->
	<module>cmdb</module>
        <! -- TODO: Integrated isTIO for flow control -->
	<module>istio</module>
        <! -- Consistency Management -->
	<module>consistency</module>
        <! -- Permission control -->
	<module>auth</module>
        <! -- system information management Env read, conf read -->
	<module>sys</module>
</modules>

Copy the code

3.1 Nacos Service Discovery and Management (C0-C20)

Nacos management of services is mainly centered in the Naming module. Here, combining with the Client, we can see what logic is involved in service discovery and management, and how to operate them

3.1.1 Obtaining the Service list on the Console

External interface:

  • C-catalogcontroller # listDetail: Obtain the list of service details
  • C-catalogcontroller # instanceList: Lists instances of a special service
  • C-catalogcontroller # serviceDetail: specifies service details

The core is mainly handled through the ServiceManager. Here’s a look at the internal logic:

Through the three interfaces, it is not difficult to find that the final calling core is the ServiceManager class/ / inner classes
C01- ServiceManager
    / / inner classes
    PC- UpdatedServiceProcessor
    PC- ServiceUpdater
    PSC- ServiceChecksum
    PC- EmptyServiceAutoClean
    PC- ServiceReporter
    PSC- ServiceKey : 
    // Core parameters
    F- Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
    F- LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
    F- ConsistencyService consistencyService;
    // Common methodsM- chooseServiceMap: obtain Server set by space name M- addUpdatedServiceToQueue: Update Server M- onChange: M -onDELETE: The server is deleted// In addition, the ServiceManager has a dependency: ConsistencyServiceC02- ConsistencyService : M-remove: deletes data from a cluster. M-get: obtains data from a cluster. M-listen: listens for changes in a key in a cluster. Deleting the listener for a key m-isavailable: returns whether the current consistency status isAvailableCURD = CURD; CURD = CURD; CURD = CURD; CURD = CURD;
1To Delete, will be called dependent objects ConsistencyService (DelegateConsistencyServiceImpl), used for consistency processing requirementsCopy the code

PS:C02_01 ConsistencyService architecture

When multiple services exist, how are they stored?

// As shown in the figure above, the related objects are stored in the clusterMap

// Note that there are two services
com.alibaba.nacos.api.naming.pojo.Service
com.alibaba.nacos.naming.core.Service


// Properties of the Service
C- Service
    I- com.alibaba.nacos.api.naming.pojo.Service
    F- Selector selector
    F- Map<String, Cluster> clusterMap = new HashMap<>();
    F- Boolean enabled
    F- Boolean resetWeight
    F- String token
    F- List<String> owners

Copy the code

3.1.2 Control classes for Nacos services and Config

In Nacos, NamingService and ConfigService are used to control the service and configuration. The underlying principle is as follows: NacosConfigService -> NacosNamingService -> NacosNamingService2A class belongs to com. Alibaba. Nacos. Client. The naming package// PS: Note that the nacos-config-spring-boot-starter and nacos-discovery-spring-boot-starter packages are used

Copy the code

3.1.3 Exit destruction of Nacos

Nacos will log out of the Server at the end of the life cycle when the service we registered is shut down

Step 1: Closeable stops the related class and stops the project. When we click stop, we can see the following list of logs

com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.beat.BeatReactor do shutdown stop
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown begin
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown stop
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.core.HostReactor do shutdown begin
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown begin
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown stop
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown begin
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown stop
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.core.HostReactor do shutdown stop
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.net.NamingProxy do shutdown begin
com.alibaba.nacos.client.naming          : [NamingHttpClientManager] Start destroying NacosRestTemplate
com.alibaba.nacos.client.naming          : [NamingHttpClientManager] Destruction of the end
com.alibaba.nacos.client.naming          : com.alibaba.nacos.client.naming.net.NamingProxy doShutdown stop here as you can see, here are the closing operation, the main Close operation is based on com.alibaba.nacos.com mon. Lifecycle. Closeable for implementation// PS: the call logic here is TODO

Copy the code

Step 2: NacosServiceRegistry logout

In addition to Closeable being closed, the Service is also logged out, primarily NacosServiceRegistry

public void deregister(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        return;
    }

    NamingService namingService = namingService();
    String serviceId = registration.getServiceId();
    String group = nacosDiscoveryProperties.getGroup();

    try {
        namingService.deregisterInstance(serviceId, group, registration.getHost(),
					registration.getPort(), nacosDiscoveryProperties.getClusterName());
    } catch (Exception e) {
        / / omit the log}}// PS: we inherit ServiceRegistry to implement the destruction logic
C- AbstractAutoServiceRegistration

Copy the code

Nacos_Closeable system

3.1.4 Health Check Process

Nacos provides real-time health checks on services and prevents requests from being sent to unhealthy hosts or service instances. Nacos supports health checks at the transport layer (PING or TCP) and at the application layer (such as HTTP, MySQL, user defined).

Health check related interfaces

  • Send instance heartbeat (InstanceController) : / nacos/v1 / ns/instance/beat
  • Update the instance of health status (HealthController) : / nacos/v1 / ns/health/instance

The Client initiates a heartbeat. Procedure

The server will periodically initiate heartbeat operations to invoke two interfaces:

C-beatreactor PC-Beattask: inner class// There are two steps:

// Step 1: BeatReactor # addBeatInfo Add a scheduled task
 public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        if((existBeat = dom2Beat.remove(key)) ! =null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        // Add the heartbeat information
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

// Step 2: Call the Server interface in BeatTask to perform heartbeat operations
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);


/ / PS: heartbeat interval default is 5 seconds (com.alibaba.nacos.api.com mon. Constants)
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);

Copy the code

The server detects the heartbeat

// The server will also check the instance. The core class is ClientBeatCheckTask

// Step 1: Create ClientBeatCheckTaskC- Service ? - Task is started when Service is initializedpublic void init(a) {
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    / /...
}

// You can also see the default time Settings
public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.computeIfAbsent(task.taskKey(),
                k -> GlobalExecutor.scheduleNamingHealth(task, 5000.5000, TimeUnit.MILLISECONDS));
}



// Step 2: Run ClientBeatCheckTaskC- ClientBeatCheckTask ? - Check and update the status of temporary instances, and delete them if they have expired.public void run(a) {
    / /... omit
    
    // Step 1: Get all instances
    List<Instance> instances = service.allIPs(true);
    
    for (Instance instance : instances) {
        // If the time is longer than the heartbeat timeout, change the health status
        if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
            if(! instance.isMarked()) {if (instance.isHealthy()) {
                    instance.setHealthy(false);
                    getPushService().serviceChanged(service);
                    ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); }}}}if(! getGlobalConfig().isExpireInstance()) {return;
    }

    for (Instance instance : instances) { 
        if (instance.isMarked()) {
            continue;
        }
        // If the heartbeat is longer than the deletion time, delete the instance
        if(System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { deleteIp(instance); }}}// PS: the first time the health status is changed, and the number of instances is changed later
    

Copy the code

ACK Health check main logic and push information

I stumbled upon an ACK mechanism that starts with events. This mode is designed to push updates to the Server through udpClient, using UDP port, when the Server changes

PS: If the client provides a UDP port when querying the service instance, the server will create a udpClient

for (Instance instance : service.allIPs(Lists.newArrayList(clusterName))) {
    if (instance.getIp().equals(ip) && instance.getPort() == port) {
        instance.setHealthy(valid);
        // Publish event ServiceChangeEvent
        pushService.serviceChanged(service);
        break; }}// ServiceChangeEvent Event handling
C- PushService                                                             
public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();
        
        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                // Get PushClient collection from ServerName and namespace
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {
                    return;
                }
                
                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                
                // Loop through all pushClients
                for (PushClient client : clients.values()) {
                    if (client.zombie()) {
                        clients.remove(client.toString());
                        continue;
                    }
                    
                    Receiver.AckEntry ackEntry;
                    // Get the cache key and get the entity data from the cache
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();
                        
                    }
                    
                    // Build the ACK entity class
                    if(compressData ! =null) {
                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if(ackEntry ! =null) {
                            cache.put(key, neworg.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); }}// Verify the UDP ACK and push the ACK entityudpPush(ackEntry); }}catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
                
            } finally{ futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); }},1000, TimeUnit.MILLISECONDS);
        
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
        
    } 
    
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            return null;
        }
        
        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;
            return ackEntry;
        }
        
        try {
            if(! ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());// Socket Send
            udpSocket.send(ackEntry.origin);
            
            ackEntry.increaseRetryTime();
            
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                    TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
            
            return ackEntry;
        } catch (Exception e) {
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;
            
            return null; }}Copy the code

Usage of the health check threshold

In the Service configuration, you can configure a floating point number from 0 to 1, defines the threshold of health examination, the threshold value corresponding to the class for the com. Alibaba. Nacos. API. Naming. Pojo. Service

C-service f-name: Service name f-protectthreshold: health threshold f-appname: application name f-groupname: groupName f-metadata: metadata// Usage of the threshold
C- InstanceController
	M- doSrvIpxt :

// Core logic
                                                                     
double threshold = service.getProtectThreshold();
// If the ratio of the number of available health instances to the total number of services in IPMap is lower than the threshold, the protection threshold is reached
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
      
	if (isCheck) {
		result.put("reachProtectThreshold".true); } ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } PS: If the Client Balancer is a valid instance of the Server, the Client Balancer is a valid instance of the ServerCopy the code

But what is the purpose of a threshold?

If a large number of exceptions occur in an instance, then the stress will eventually be on a few healthy instances, which can lead to a chain reaction

To avoid these conditions, all instances are returned when the health threshold is reached.

** ipmap.get (boolea.true).addall (ipmap.get (boolea.false)); The purpose of the war.

Although the Client may encounter an exception instance, it can prevent the entire system from crashing

Nacos/v1 / ns/health/instance parameters

The name of the type Whether the choice describe
namespaceId string no Namespace ID
serviceName string is The service name
groupName string no Group name
clusterName string no The cluster name
ip string is Service Instance IP
port int is Service Instance Port
healthy boolean is Whether health

Treatment of weight

Weights are mainly configured in the Instance object

C- Instance
    M- instanceId
    M- ip
    M- port
    M- weight : 权重
    M- healthy : 健康情况
    M- enabled
    M- ephemeral
    M- clusterName
    M- serviceName
    M- metadata

Copy the code

PS: Weight can be allocated when the weight is used on the Client

Refer to the original @ blog.csdn.net/krisdad/art…

public class NacosWeightLoadBalanceRule extends AbstractLoadBalancerRule {
 
  @Override
  public void initWithNiwsConfig(IClientConfig clientConfig) {}
 
  @Resource private NacosDiscoveryProperties nacosDiscoveryProperties;
 
  @Override
  public Server choose(Object key) {
    // 1. Obtain the name of the service
    BaseLoadBalancer loadBalancer = (BaseLoadBalancer) this.getLoadBalancer();
    String serverName = loadBalancer.getName();
    // 2. In this case, the Nacos Client will automatically implement the load balancing algorithm based on the weight
    NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
    try {
      Instance instance = namingService.selectOneHealthyInstance(serverName);
      return new NacosServer(instance);
    } catch (NacosException e) {
      e.printStackTrace();
    }
    return null;
  }

@Bean
public IRule getLoadBalancerRule(a){
	return new NacosWeightLoadBalancerRule();
}


// PS: I think the weight is given to the Client to handle itself

Copy the code

Other points

// The default value of Nacos

@Value("${nacos.naming.empty-service.auto-clean:false}")
private boolean emptyServiceAutoClean;

@Value("${nacos.naming.empty-service.clean.initial-delay-ms:60000}")
private int cleanEmptyServiceDelay;

@Value("${nacos.naming.empty-service.clean.period-time-ms:20000}")
private int cleanEmptyServicePeriod;

Copy the code

3.2 Nacos configuration process C30-C60

3.2.1 Nacos configuration management

// To get the configuration, there are several main steps:
C30- NacosConfigService
	M30_01- getConfigInner(String tenant, String dataId, String group, longTimeoutMs) - Build ConfigResponse and set dataId, tenant, group for it1- call LocalConfigInfoProcessor. GetFailover prefer using local configuration2- Call ClientWorker to get remote configuration -> PS:M30_01_013- there is still no, LocalConfigInfoProcesso getSnapshot snapshots End - configFilterChainManager Filter chain processing// PS:M30_01_01 ClientWorker processingA remote service request is made in a ClientWorker. The core code is agent.httpget (constants.config_controller_Path,null, params, agent.getEncode(), readTimeout);

// As you can see, there is no overloading logic here, it is still a Rest request: PS look at the official website message, 2.0 will use a long link, this should change
- Constants.CONFIG_CONTROLLER_PATH : /v1/cs/configs


Copy the code

LocalConfigInfoProcessor Specifies the main logic

// Local configuration refers to the local File File, here is how to use the source code:C31- LocalConfigInfoProcessor M31_01- getFailover - Obtain localPath -> PS:M31_01_01 M32_02- saveSnapshot: Will the snapshot be saved after the snapshot is successfully obtained? - Save path: omit \nacos\config\fixed-127.0. 0.1_8848_nacos\snapshot\one1\test1
        
            
// PS:M31_01_01 localPath
C:\Users\10169\nacos\config\fixed-127.0. 0.1_8848_nacos\data\config-data\one1\test1    

Copy the code

Pro 1: What can we see from this source code?

Since local files exist, does that mean I can use the local configuration first by changing this path, < nacos > config > fixed-127.0.1_8848_nacos > data > config-data > one1 > test1 < nacos > data > one1 > test1 < nacos > data > one1 > test1 < nacos > config > fixed-127.0.1_8848_nacos > data > one1 > test1

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * *true

Copy the code

Use of Pro 2: Filter

You can see above, the process of configuration, has a default Filter processing configFilterChainManager. DoFilter (null, cr);

    // With this logic, more configurations are possibleC32- ConfigFilterChainManager ? - You can customize Filter M32_01- addFilter M32_02- doFilterCopy the code

TODO: How to insert Filter here need to be improved, no interface to add Filter, strange….

3.2.2 Dr Processing configured with Nacos

Nacos LocalConfigInfoProcessor provides disaster recovery (Dr) functions in two modes: local configuration and snapshot processing

Local configuration

As stated above, the specified path can be changed to achieve

Configure the snapshot

The Nacos client SDK generates a snapshot of the configuration locally. When a client cannot connect to Nacos Server, a configuration snapshot can be used to show the overall disaster recovery capability of the system. Configuration snapshots are similar to local commit in Git and are similar to caches in that they are updated when appropriate, but there is no concept of cache expiration.

3.2.3 Nacos dynamic configuration processing

Dynamic configuration mainly refers to the monitoring of configuration changes:

Nacos uses long polling to detect configuration changes. The corresponding core class is LongPollingRunnable # checkUpdateDataIds. Check >>>> for debugging


    
 class LongPollingRunnable implements Runnable {
        
        private final int taskId;
        
        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }
        
        @Override
        public void run(a) {
            
             / /... The core statementList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); }}List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
        StringBuilder sb = new StringBuilder();
        for (CacheData cacheData : cacheDatas) {
            if(! cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {
                    sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
                } else {
                    sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                    sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
                }
                if (cacheData.isInitializing()) {
                    // It updates when cacheData occours in cacheMap by first time.inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); }}}booleanisInitializingCacheList = ! inInitializingCacheList.isEmpty();return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
} 


List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
        
        Map<String, String> params = new HashMap<String, String>(2);
        params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
        Map<String, String> headers = new HashMap<String, String>(2);
    	// Long polling
        headers.put("Long-Pulling-Timeout"."" + timeout);
        
        // told server do not hang me up if new initializing cacheData added in
        if (isInitializingCacheList) {
            headers.put("Long-Pulling-Timeout-No-Hangup"."true");
        }
        
        if (StringUtils.isBlank(probeUpdateString)) {
            return Collections.emptyList();
        }
        
        try {
            // In order to prevent the server from handling the delay of the client's long task,
            // increase the client's read timeout to avoid this problem.
            
            long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
            // /v1/cs/configs/listener
            HttpRestResult<String> result = agent
                    .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                            readTimeoutMs);
            
            if (result.ok()) {
                setHealthServer(true);
                return parseUpdateDataIdResponse(result.getData());
            } else {
                setHealthServer(false); }}catch (Exception e) {
            setHealthServer(false);
            throw e;
        }
        return Collections.emptyList();
}


// The corresponding Controller is ConfigController
    public void listener(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
       
        / /...
        
        Map<String, String> clientMd5Map;
        try {
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        
        // do long-polling
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }

// The corresponding polling interface
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
            Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
        
        // Long polling.
        if (LongPollingService.isSupportLongPolling(request)) {
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }
        
        // Compatible with short polling logic.
        List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
        
        // Compatible with short polling result.
        String oldResult = MD5Util.compareMd5OldResult(changedGroups);
        String newResult = MD5Util.compareMd5ResultString(changedGroups);
        
        String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
        if (version == null) {
            version = "2.0.0";
        }
        int versionNum = Protocol.getVersionNumber(version);
        
        Before 2.0.4 version, return value is put into header.
        if (versionNum < START_LONG_POLLING_VERSION_NUM) {
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
        } else {
            request.setAttribute("content", newResult);
        }
        
        Loggers.AUTH.info("new content:" + newResult);
        
        // Disable cache.
        response.setHeader("Pragma"."no-cache");
        response.setDateHeader("Expires".0);
        response.setHeader("Cache-Control"."no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        return HttpServletResponse.SC_OK + "";
}


// LongPollingServiceFor more details, please see this HTTPS://www.jianshu.com/p/acb9b1093a54

Copy the code

3.2.4 Nacos metadata

Let’s see, what is the metadata of Nacos?

Nacos Data (such as configuration and service) description information, such as service version, weight, disaster recovery policy, load balancing policy, authentication configuration, and various user-defined labels. In terms of scope, it can be classified into service level meta information, cluster meta information, and instance meta information.

// Method 1: Configure this parameter when configuring services
spring:
  application:
    name: nacos-config-server
  cloud:
    nacos:
      discovery:
        server-addr: 127.0. 01.:8848
        metadata:
          version: v1
              
// Method 2: Configure the page directly
              
// Method 3: API call, using Delete, Update and other requests to determine the typeBatch update instance metadata (InstanceController) : / nacos/v1 / ns/instance/metadata/batch bulk delete instance metadata (InstanceController) : /nacos/v1/ns/instance/metadata/batchCopy the code

3.3 Nacos load balancing

Load balancing of Nacos belongs to dynamic DNS service

Dynamic DNS services support weighted routing, enabling you to implement load balancing at the middle layer, flexible routing policies, traffic control, and simple DNS resolution services on the data center Intranet. Dynamic DNS services also make it easier for you to implement DNS protocol-based service discovery to help you eliminate the risk of coupling to vendor-private service discovery apis.

Based on the relevant knowledge of Feign, we know that the Balance is processed in the BaseLoadBalancer combined with the Rule. Here we will analyze the structure through which the two are processed

Step 1: Feign calls Nacos

We start with BaseLoadBalancer, conduct Debug processing, and go to the point PredicateBasedRule

C- PredicateBasedRule M- choose(Object key) - Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); ? - here you can see, one of the lb. GetAllServers operation, the lb for DynamicServerListLoadBalancer here// PS: getAllServers, you can see that all the Servers are in the List
public List<Server> getAllServers(a) {
	return Collections.unmodifiableList(allServerList);
}


// Trace the logic that was put in, starting with the loading of the ILoadBalancer Bean, and calling chain as follows:C - RibbonClientConfiguration # ribbonLoadBalancer: build a ILoadBalancer C - ZoneAwareLoadBalancer: Enter the ZoneAwareLoadBalancer constructor C - DynamicServerListLoadBalancer: Enter the constructor C - DynamicServerListLoadBalancer # restOfInit: Init operation C - DynamicServerListLoadBalancer # updateListOfServers: Update the Server List main process, find the first Server List here, following the Debug C - the first node DynamicServerListLoadBalancer # updateAllServerList: SettingsServerList 
    
    
public void updateListOfServers(a) {
    List<T> servers = new ArrayList<T>();
    if(serverListImpl ! =null) {
        servers = serverListImpl.getUpdatedListOfServers();
        if(filter ! =null) { servers = filter.getFilteredListOfServers(servers); } } updateAllServerList(servers); } as you can see, there is2A Server logic method, here have a look at the family system, is clear about serverListImpl. GetUpdatedListOfServers (); filter.getFilteredListOfServers(servers);// There is an implementation class, NacosServerList, which obtains the list of services from Nacos
private List<NacosServer> getServers(a) {
    try {
        String group = discoveryProperties.getGroup();
        List<Instance> instances = discoveryProperties.namingServiceInstance()
					.selectInstances(serviceId, group, true);
        return instancesToServerList(instances);
    } catch (Exception e) {
        throw new IllegalStateException(....);
    }
}


// Load balancing policyThe load balancing policy is based on BalanceCopy the code

3.4 Cluster processing

Nacos cluster

Nacos cluster is relatively easy to use. You only need to configure the corresponding service information in /conf/cluster.conf >>>>


#it is ip
#example
127.0. 01.:8848
127.0. 01.:8849
127.0. 01.:8850

Copy the code

Nacos cluster source tracking

Looking at the source level, how does this logic work?

Core processing class in com. Alibaba. Nacos. Core. The cluster


// Step 1: Obtain the configuration method
C- EnvUtil 
public static String getClusterConfFilePath(a) {
	return Paths.get(getNacosHome(), "conf"."cluster.conf").toString();
}    

// Read the Cluster configuration
public static List<String> readClusterConf(a) throws IOException {
    try (Reader reader = new InputStreamReader(new FileInputStream(new File(getClusterConfFilePath())),
                StandardCharsets.UTF_8)) {
        return analyzeClusterConf(reader);
    } catch (FileNotFoundException ignore) {
        List<String> tmp = new ArrayList<>();
        String clusters = EnvUtil.getMemberList();
        if (StringUtils.isNotBlank(clusters)) {
            String[] details = clusters.split(",");
            for(String item : details) { tmp.add(item.trim()); }}returntmp; }}// Step 2: Using a Cluster
AbstractMemberLookup
    
// The main usage is concentrated in the ServerManagerC- ServerManager F- ServerMemberManager memberManager; C-servermembermanager: Cluster node management in Nacos m-init: cluster node manager initialization m-getSelf: Obtain local node information m-getMemberAddressInfo: M-update: updates information about the target node. M-isunhealth: indicates whether the target node is healthy. M-initandstartlookup: initializes the addressing mode// TODO: other methods are omitted. Later, we will conduct related performance analysis. The source code combing of the cluster is expected to be put in that part of the analysis

Copy the code

conclusion

This article is a more application oriented article, the source code depth is less, the more important reason is because Nacos source layer is clearer, the structure is clear, do not need to load the depth.

In addition, Nacos 2.0 is also being released, see the documentation using Socket long connection, if you have a chance, then compare the difference between the two to see.

The appendix

Appendix 1: Manually call Server

package com.alibaba.nacos.discovery.service;

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Cluster;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.healthcheck.AbstractHealthChecker;
import netscape.javascript.JSObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/ * * *@Classname NacosClientService
 * @Description TODO
 * @Date 2021/5/26
 * @Created by zengzg
 */
@Component
public class NacosClientNodesService implements ApplicationRunner {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private NamingService namingService;

    @Value("${spring.cloud.nacos.config.server-addr}")
    private String serverAddr;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        namingService = NacosFactory.createNamingService(properties);
    }

    /** * get Nacos Config * parameter format * {* "instanceId": "192.168.0.97#9083#DEFAULT#DEFAULT_GROUP@@nacos-user-server", * "IP ": "192.168.0.97", * "port": 9083, * "weight": 1, the server * "healthy" can be determined by the weight: true, * "enabled": true, * "ephemeral": true, * "clusterName": "DEFAULT", * "serviceName": "DEFAULT_GROUP@@nacos-user-server",
     *      "metadata": {
     *           "preserved.register.source": "SPRING_CLOUD"
     *      },
     *      "ipDeleteTimeout": 30000,
     *      "instanceHeartBeatInterval": 5000,
     *      "instanceHeartBeatTimeOut": 15000
     * }
     *
     * @param serviceName
     * @return* /
    public List<Instance> get(String serviceName) {
        List<Instance> content = new LinkedList<Instance>();
        try {
            content = namingService.getAllInstances(serviceName);
            logger.info(Config serviceName [{}] <-------", serviceName);
        } catch (NacosException e) {
            logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
            e.printStackTrace();
        }

        return content;


    }

    /** * create Nacos Config **@param serviceName
     * @param ip
     * @param port
     */
    public void createOrUpdate(String serviceName, String ip, Integer port) {
        try {
            logger.info("-- -- -- -- -- - > create Config GroupID / {} - DataID [{}] Success, The value: [{}] < -- -- -- -- -- -- --", serviceName, ip, port);
            namingService.registerInstance(serviceName, ip, port, "TEST1");
        } catch (NacosException e) {
            logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage()); e.printStackTrace(); }}/** * remove Nacos Config **@param serviceName
     * @param ip
     */
    public void delete(String serviceName, String ip, Integer port) {
        try {
            namingService.deregisterInstance(serviceName, ip, port, "DEFAULT");
            logger.info("-- -- -- -- -- - > delete the Config GroupID / {} - DataID [{}] Success < -- -- -- -- -- -- --", serviceName, ip);
        } catch (NacosException e) {
            logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage()); e.printStackTrace(); }}}Copy the code

Appendix 2: Call Config manually

public class NacosClientConfigService implements ApplicationRunner {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private ConfigService configService;

    @Value("${spring.cloud.nacos.config.server-addr}")
    private String serverAddr;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        configService = NacosFactory.createConfigService(properties);
    }


    /**
     * 获取 Nacos Config
     *
     * @param dataId
     * @param groupId
     * @return* /
    public String get(String dataId, String groupId) {
        String content = "";
        try {
            content = configService.getConfig(dataId, groupId, 5000);
            logger.info("-- -- -- -- -- - > get The Config GroupID / {} - DataID [{}] Success, The value: [{}] < -- -- -- -- -- -- --", dataId, groupId, content);

            configService.addListener(dataId, groupId, new ConfigListener());

        } catch (NacosException e) {
            logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
            e.printStackTrace();
        }

        return content;


    }

    /** * create Nacos Config **@param dataId
     * @param groupId
     * @param content
     */
    public void createOrUpdate(String dataId, String groupId, String content) {
        try {
            logger.info("-- -- -- -- -- - > create Config GroupID / {} - DataID [{}] Success, The value: [{}] < -- -- -- -- -- -- --", dataId, groupId, content);
            configService.publishConfig(dataId, groupId, content);
        } catch (NacosException e) {
            logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage()); e.printStackTrace(); }}/** * remove Nacos Config **@param dataId
     * @param groupId
     */
    public void delete(String dataId, String groupId) {
        try {
            configService.removeConfig(dataId, groupId);
            logger.info("-- -- -- -- -- - > delete the Config GroupID / {} - DataID [{}] Success < -- -- -- -- -- -- --", dataId, groupId);

            configService.removeListener(dataId, groupId, null);

        } catch (NacosException e) {
            logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage()); e.printStackTrace(); }}}Copy the code

Appendix III: Using NacosInjected

passable

<! <groupId>com.alibaba.boot</groupId> <artifactId>nacos-config-spring-boot-starter</artifactId> <version>0.27.</version>
</dependency>
<dependency>
    <groupId>com.alibaba.boot</groupId>
    <artifactId>nacos-discovery-spring-boot-starter</artifactId>
    <version>0.27.</version>
</dependency>


@NacosInjected
private ConfigService configService;

    @NacosInjected
    private NamingService namingService;

Copy the code

Appendix 4: Naocs Official Structure Diagram (Transport)

Here is pure handling, you can see the official document @nacos. IO /zh-cn/docs/…

Function:

  • Service management: Implement service CRUD, domain name CRUD, service health status check, and service weight management
  • Configuration management: realize CRUD of configuration management, version management, gray management, monitoring management, push trajectory, aggregated data and other functions
  • Metadata management: Provides metadata CURD and marking capabilities
  • Plug-in mechanism: the three modules can be divided and combined, and the SPI mechanism of extension point is realized
  • Event mechanism: realize asynchronous event notification, SDK data change asynchronous notification and other logic
  • Log module: manages log classification, log level, log portability (especially to avoid conflicts), log format, exception code + help document
  • Callback mechanism: THE SDK notifies the data and calls back the user for processing in a unified mode. Interfaces and data structures need to be scalable
  • Addressing mode: Address IP, domain name, Nameserver, broadcast and other addressing modes, need to be extensible
  • Push channel: Addresses performance push issues between servers and storage devices, between servers, and between servers and SDKS
  • Capacity management: Manages the capacity of each tenant and group to prevent storage from being overwritten and affecting service availability
  • Traffic management: Controls the request frequency, number of long links, packet size, and request flow control by tenant and group
  • Cache mechanism: Dr Directory, local cache, server cache mechanism. Tools are required to use the Dr Directory
  • Startup mode: Start different programs and UIs in single-machine deployment mode, configuration mode, service mode, DNS mode, or all mode
  • Consistency protocol: Different consistency mechanisms for different data and different consistency requirements
  • Storage module: solves persistent and non-persistent data storage and data fragmentation
  • Nameserver: Solves the problem of routing a Namespace to a ClusterID and the problem of mapping the user environment to the NACOS physical environment
  • CMDB: Solve metadata storage, interconnection problems with the third-party CMDB system, and solve the relationship between applications, people, and resources
  • Metrics: Expose standard Metrics data to facilitate communication with the three-party monitoring system
  • Trace: Expose standard Trace to facilitate communication with SLA system, log whitening, push Trace and other capabilities, as well as the metering and billing system
  • Access management: it is equivalent to the process of opening aliyun services and assigning identity, capacity and permission
  • User management: solve user management, login, SSO and other issues
  • Permission management: solve identity, access control, role management and other issues
  • Audit system: The extended interface is convenient to connect with different audit systems of companies
  • Notification system: Core data change, or operation, easy to get through through SMS system, notify the corresponding person of data change
  • OpenAPI: Exposes a standard REST-style HTTP interface that is easy to use and facilitates multi-language integration
  • Console: easy-to-use Console for service management and configuration management
  • SDK: Multilingual SDK
  • Agent: this mode is similar to DNS -f, or it can be integrated with mesh solutions
  • CLI: Command line for lightweight management of products, like Git easy to use

Domain model:

The SDK class diagram: