Nacos service list management

Nacos offers open apis can pass/Nacos/v1 / ns/instance/list for service list. If we use the Spring-Cloud approach to obtain the service, we will eventually use Nacos Client + loadbalancer for Client load balancing.

Environment introduction:

  • Jdk 1.8
  • Nacos server -- 1.4.2
  • Spring - the boot - 2.3.5. RELEASE
  • spring-cloud-Hoxton.SR8
  • Spring - cloiud alibab -- 2.2.5. RELEASE

Ribbon Ribbon Ribbon Ribbon Ribbon

Introduction of Ribbon

Spring Cloud Ribbon is a client load balancing tool implemented by Netflix Ribbon. To put it simply, The Ribbon is an open source project released by Netflix. Its main function is to provide complex algorithms and service invocation on the client side. The Ribbon client component provides a comprehensive set of configuration options such as timeout and retry. The Ribbon automatically links all the machines behind the LOAD Balancer in your configuration file based on the following rules: simple polling, random linking, and so on. It’s easy to use the Ribbon’s custom load balancing algorithms.

Ribbon using

  1. First you need to define the RestTemplate using the Ribbon policy;
@Configuration
public class RestTemplateConfig {

    @LoadBalanced
    @Bean
    public RestTemplate restTemplate(a) {
        return newRestTemplate(); }}Copy the code
  1. Use RestTemplate locally to call the remote interface.
@Autowired
private RestTemplate restTemplate;

@RequestMapping(value = "/echo/{id}", method = RequestMethod.GET)
public String echo(@PathVariable Long id) {
    return restTemplate.getForObject("http://member-service/member/get/" + id, String.class);
}
Copy the code

Ribbon Source code Analysis

  1. RestTemplate inherits InterceptingHttpAccessorinterceptorsField accepts the HttpRequestInterceptor request interceptor.
  2. For Ribbion initializer classesRibbonAutoConfigurationIn, it’s inspring-cloud-netflix-ribbonIn the definition.
  3. But before it can be initialized, it needs to be loaded againRibbonAutoConfigurationConfiguration, it is inspring-cloud-commonIn the. The specific code is as follows:
@Configuration(proxyBeanMethods = false)
// The RestTemplate class must exist in the project
@ConditionalOnClass(RestTemplate.class)
// There must be a LoadBalancerClient class Bean instance in the container
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

   // Get all RestTemplate instances in the Spring container
   @LoadBalanced
   @Autowired(required = false)
   private List<RestTemplate> restTemplates = Collections.emptyList();

   / / get the Spring container LoadBalancerRequestTransformer instance
   @Autowired(required = false)
   private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

   / / in the Bean will call afterSingletonsInstantiated method after initialization is complete
   // Here is a lambda-style implementation that sets the RestTemplateCustomizer for the restTemplate instance
   @Bean
   public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
         final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
      return () -> restTemplateCustomizers.ifAvailable(customizers -> {
         for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
            for(RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); }}}); }/ / LoadBalancerRequestFactory factory class
   / / is mainly used to provide and LoadBalancerRequestTransformer LoadBalancerClient instance
   @Bean
   @ConditionalOnMissingBean
   public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) {
      return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
   }

   LoadBalancerInterceptor LoadBalancerInterceptor
   @Configuration(proxyBeanMethods = false)
   @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
   static class LoadBalancerInterceptorConfig {

      // Create an instance of the default LoadBalancerInterceptor
      @Bean
      public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
         return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
      }

      // The RestTemplateCustomizer instance will be created if there is no instance
      // Add the loadBalancerInterceptor interceptor to all of our restTemplate instances
      @Bean
      @ConditionalOnMissingBean
      public RestTemplateCustomizer restTemplateCustomizer(
            final LoadBalancerInterceptor loadBalancerInterceptor) {
         return restTemplate -> {
            List<ClientHttpRequestInterceptor> list = newArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; }}// ...

}
Copy the code

We can summarize the following code:

  • If you want to use load balancing, you must have the RestTemplate class under your project, and then in the Spring containerLoadBalancerClientThe instance.
  • LoadBalancerClientspring-cloud-netflix-ribbonHas only one implementation class in:RibbonLoadBalancerClient
  • Using Spring’sSmartInitializingSingletonExpansion point, inrestTemplateCustomizer()Is all of themRestTemplateaddLoadBalancerInterceptorThe interceptor
  • The essence of a LoadBalancer is to pass an interceptor. usingRestTemplateTo achieve the extension pointRequest service load balancing.

LoadBalancerInterceptor

The LoadBalancerInterceptor passes the request to the LoadBalancerClient for processing. It first selects an implementation of ILoadBalancer to handle fetching and selecting services. Then the serviceName and load balancing algorithm are used to select the Server object. Finally, the request is executed.

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

   // Load balancing
   private LoadBalancerClient loadBalancer;

   // Build the request
   private LoadBalancerRequestFactory requestFactory;

   // ...

   @Override
   public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
         final ClientHttpRequestExecution execution) throws IOException {
      final URI originalUri = request.getURI();
      String serviceName = originalUri.getHost(); 
      return this.loadBalancer.execute(serviceName,
            this.requestFactory.createRequest(request, body, execution)); }}Copy the code

RibbonLoadBalancerClient

We by tracking this. LoadBalancer. Find the execute code. All requests are processed by the RibbonLoadBalancerClient. It did. LoadBalancerClient interface, code as follows:

public interface ServiceInstanceChooser {

  // Use serviceId to select a service instance
	ServiceInstance choose(String serviceId);

}

public interface LoadBalancerClient extends ServiceInstanceChooser {

  <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
  <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest
       
         request)
        throws IOException;
  // Replace service instance information with specific IP information
  URI reconstructURI(ServiceInstance instance, URI original);

}
Copy the code

Firstly, we analyze the choose method of RibbonLoadBalancerClient

@Override
public ServiceInstance choose(String serviceId) {
   return choose(serviceId, null);
}

// Select a specific service instance by service name
public ServiceInstance choose(String serviceId, Object hint) {
   Server server = getServer(getLoadBalancer(serviceId), hint);
   if (server == null) {
      return null;
   }
   return new RibbonServer(serviceId, server, isSecure(server, serviceId),
         serverIntrospector(serviceId).getMetadata(server));
}

// Select a load balancer by service name. The default is' ZoneAwareLoadBalancer '
protected ILoadBalancer getLoadBalancer(String serviceId) {
   return this.clientFactory.getLoadBalancer(serviceId);
}

// Get the service
protected Server getServer(ILoadBalancer loadBalancer) {
   return getServer(loadBalancer, null);
}
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
   if (loadBalancer == null) {
      return null;
   }
   // Use 'default' on a null hint, or just pass it on?
   returnloadBalancer.chooseServer(hint ! =null ? hint : "default");
}
Copy the code

LoadBalancerInterceptor executes by directly delegating the loadbalancer.execute () method:

/ / LoadBalancerRequest by LoadBalancerRequestFactory. CreateRequest (request, body, execution) to create
// It implements the LoadBalancerRequest interface as an anonymous inner class with the generic type ClientHttpResponse
/ / because eventually perform obviously or actuators: ClientHttpRequestExecution. The execute ()
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
	return execute(serviceId, request, null);
}

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
	// Get the load balancer, then get a serverInstance instance
	ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
	Server server = getServer(loadBalancer, hint);
	if (server == null) { // Throw an exception if not found. IllegalStateException is used here
		throw new IllegalStateException("No instances available for " + serviceId);
	}

	// match the Server to RibbonServer isSecure: the client isSecure
	Reference configuration file: / / serverIntrospector introspection ServerIntrospectorProperties
	RibbonServer ribbonServer = new RibbonServer(serviceId, server,
			isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));

	// Call the overloaded interface method of this class
	return execute(serviceId, ribbonServer, request);
}

// Its argument is ServiceInstance --> A unique Server instance has been identified
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {

	RibbonServer is the only implementation of execute
	Server server = null;
	if (serviceInstance instanceof RibbonServer) {
		server = ((RibbonServer) serviceInstance).getServer();
	}
	if (server == null) {
		throw new IllegalStateException("No instances available for " + serviceId);
	}

	// The execution context is bound to serviceId
	RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId); .// Actually send the request to the server and get the return value
	/ / because there are interceptors, so here sure says executive is InterceptingRequestExecution the execute () method
	/ / so will call ServiceRequestWrapper. GetURI (), which is called reconstructURI () method
		T returnVal = request.apply(serviceInstance);
		returnreturnVal; .// Exception handling
}
Copy the code

ReturnVal is a ClientHttpResponse, which is finally passed to the handleResponse() method to handle exceptions (if any), or to the extractor if none: ResponseExtractor. ExtractData (response) so that the entire request if all finished.

ZoneAwareLoadBalancer

The following figure shows the class diagram of ZoneAwareLoadBalancer. it

DynamicServerListLoadBalancer its parent class, core method

  • Reset and initialize:restOfInit(clientConfig)
  • Update service list:updateListOfServers();The party needs to call to ServerList. GetUpdatedListOfServers () here is called to a specific registry implementation, In the case of Nacos, his implementation is NacosServerList#getUpdatedListOfServers();
  • Update all service list:updateAllServerList();
  • Set all service listssetServersList()

ZoneAwareLoadBalancer its core method:

  • Select a service instancechooseServer()
  • Select a load balancergetLoadBalancer
  • Select service instances within the region:zoneLoadBalancer.chooseServer

Ribbon summary

For the use of RestTemplate under @loadBalanced, I summarize as follows:

  • The url passed in as String must be an absolute path (http://,…). Otherwise, an exception is thrown:java.lang.IllegalArgumentException: URI is not absolute
  • ServiceId is case insensitive (http://order-service/… The effect is the same as http://OERDER-SERVICE/…
  • Do not use port after serviceId

Finally, it is important to note that: The @loadBalanced RestTemplate can only be a serviceId and cannot write an IP address/domain name to send requests. If both cases are required in your project, you need to define multiple RestTemplates for different scenarios

Nacos service query

Client query

If we use the default Nacos client, we go NacosServerList#getUpdatedListOfServers(); Interface to query the list of services.

public class NacosServerList extends AbstractServerList<NacosServer> {

	private NacosDiscoveryProperties discoveryProperties;

	@Override
	public List<NacosServer> getUpdatedListOfServers(a) {
		return getServers();
	}
    
    private List<NacosServer> getServers(a) {
		try {
			String group = discoveryProperties.getGroup();
			// discoveryProperties.namingServiceInstance() 
            / / in the end by reflecting access com. Alibaba. Nacos. Client. Naming. NacosNamingService instance
            List<Instance> instances = discoveryProperties.namingServiceInstance()
					.selectInstances(serviceId, group, true);
			return instancesToServerList(instances);
		}
		catch (Exception e) {
			throw new IllegalStateException(
					"Can not get service instances from nacos, serviceId="+ serviceId, e); }}}Copy the code

The selectInstances method is then called

@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
                                      boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    // subscribe defaults to true
    if (subscribe) {
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                                                 StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor
            .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                                              StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}
Copy the code

. In fact, the core logic in hostReactor getServiceInfo in query service information will convert the current serviceName, clusters to the key, Then use the getServiceInfo0 method to query the service information. In this case, local data is queried.

If null == serviceObj calls /instance/list in updateServiceNow to query service information

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            
            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
            
        } else if (updatingMap.containsKey(serviceName)) {
            // UPDATE_HOLD_INTERVAL is set to constant by default
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        // The maximum wait time is 5s. After updating serviceObj, notifyAll() is executed.
                        // Method entry updateService(String serviceName, String Clusters)
                        // Maximum delay 2s DEFAULT_DELAY = 1
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:"+ clusters, e); }}}}// Update service information through Schedule
        scheduleUpdateIfAbsent(serviceName, clusters);
        
    	// Get the latest value
        return serviceInfoMap.get(serviceObj.getKey());
    }
Copy the code

It’s easy to understand why the Ribbon calls slowly the first time because it initializes the service list and then queries the service instance through Nacos Client.

Server-side processing

The server processes the service instance query request through the /instance/list interface. First, the service instance information is stored in ConcurrentHashMap

    /** * Map(namespace, Map(group::serviceName, Service)). */
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
Copy the code

In the process of our query is mainly managed through ServiceManager, the core entry method in InstanceController#doSrvIpxt

    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
        
        ClientInfo clientInfo = new ClientInfo(agent);
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();
        
        // now try to enable the push
        try {
            if (udpPort > 0 && pushService.canEnablePush(agent)) {
                
                pushService
                        .addClient(namespaceId, serviceName, clusters, agent, newInetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); }}catch (Exception e) {
            Loggers.SRV_LOG
                    .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }
        
        if (service == null) {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.put("cacheMillis", cacheMillis);
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }
        
        checkIfDisabled(service);
        
        List<Instance> srvedIPs;
        
        // Query all services
        // The list of services will be updated internally
        // allInstances.addAll(persistentInstances);
        // allInstances.addAll(ephemeralInstances);
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
        
        // filter ips using selector:
        if(service.getSelector() ! =null && StringUtils.isNotBlank(clientIP)) {
            srvedIPs = service.getSelector().select(clientIP, srvedIPs);
        }
        
        if (CollectionUtils.isEmpty(srvedIPs)) {
            
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            
            if (clientInfo.type == ClientInfo.ClientType.JAVA
                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0"> =))0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }
            
            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL".false);
            result.put("clusters", clusters);
            result.put("env", env);
            result.set("hosts", JacksonUtils.createEmptyArrayNode());
            result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
            return result;
        }
        
        Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
        ipMap.put(Boolean.TRUE, new ArrayList<>());
        ipMap.put(Boolean.FALSE, new ArrayList<>());
        
        for (Instance ip : srvedIPs) {
            ipMap.get(ip.isHealthy()).add(ip);
        }
        
        if (isCheck) {
            result.put("reachProtectThreshold".false);
        }
        
        double threshold = service.getProtectThreshold();
        
        if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
            
            Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
            if (isCheck) {
                result.put("reachProtectThreshold".true);
            }
            
            ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
            ipMap.get(Boolean.FALSE).clear();
        }
        
        if (isCheck) {
            result.put("protectThreshold", service.getProtectThreshold());
            result.put("reachLocalSiteCallThreshold".false);
            
            return JacksonUtils.createEmptyJsonNode();
        }
        
        ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
        
        for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
            List<Instance> ips = entry.getValue();
            
            if(healthyOnly && ! entry.getKey()) {continue;
            }
            
            for (Instance instance : ips) {
                
                // remove disabled instance:
                if(! instance.isEnabled()) {continue;
                }
                
                ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
                
                ipObj.put("ip", instance.getIp());
                ipObj.put("port", instance.getPort());
                // deprecated since nacos 1.0.0:
                ipObj.put("valid", entry.getKey());
                ipObj.put("healthy", entry.getKey());
                ipObj.put("marked", instance.isMarked());
                ipObj.put("instanceId", instance.getInstanceId());
                ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
                ipObj.put("enabled", instance.isEnabled());
                ipObj.put("weight", instance.getWeight());
                ipObj.put("clusterName", instance.getClusterName());
                if (clientInfo.type == ClientInfo.ClientType.JAVA
                        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0"> =))0) {
                    ipObj.put("serviceName", instance.getServiceName());
                } else {
                    ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                }
                
                ipObj.put("ephemeral", instance.isEphemeral());
                hosts.add(ipObj);
                
            }
        }
        
        result.replace("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0"> =))0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL".false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }
Copy the code

The core logic above is:

  1. Call the service.srvIPs method to query all service instances
  2. Cluster#allIPsAll service registration information is written to the service registry.

Refer to the link

  • nacos.io
  • zhuanlan.zhihu.com
  • Blog.csdn.net/f641385712/…