Nacos service list management
Nacos offers open apis can pass/Nacos/v1 / ns/instance/list for service list. If we use the Spring-Cloud approach to obtain the service, we will eventually use Nacos Client + loadbalancer for Client load balancing.
Environment introduction:
Jdk 1.8
Nacos server -- 1.4.2
Spring - the boot - 2.3.5. RELEASE
spring-cloud-Hoxton.SR8
Spring - cloiud alibab -- 2.2.5. RELEASE
Ribbon Ribbon Ribbon Ribbon Ribbon
Introduction of Ribbon
Spring Cloud Ribbon is a client load balancing tool implemented by Netflix Ribbon. To put it simply, The Ribbon is an open source project released by Netflix. Its main function is to provide complex algorithms and service invocation on the client side. The Ribbon client component provides a comprehensive set of configuration options such as timeout and retry. The Ribbon automatically links all the machines behind the LOAD Balancer in your configuration file based on the following rules: simple polling, random linking, and so on. It’s easy to use the Ribbon’s custom load balancing algorithms.
Ribbon using
- First you need to define the RestTemplate using the Ribbon policy;
@Configuration
public class RestTemplateConfig {
@LoadBalanced
@Bean
public RestTemplate restTemplate(a) {
return newRestTemplate(); }}Copy the code
- Use RestTemplate locally to call the remote interface.
@Autowired
private RestTemplate restTemplate;
@RequestMapping(value = "/echo/{id}", method = RequestMethod.GET)
public String echo(@PathVariable Long id) {
return restTemplate.getForObject("http://member-service/member/get/" + id, String.class);
}
Copy the code
Ribbon Source code Analysis
- RestTemplate inherits InterceptingHttpAccessor
interceptors
Field accepts the HttpRequestInterceptor request interceptor. - For Ribbion initializer classes
RibbonAutoConfiguration
In, it’s inspring-cloud-netflix-ribbon
In the definition. - But before it can be initialized, it needs to be loaded again
RibbonAutoConfiguration
Configuration, it is inspring-cloud-common
In the. The specific code is as follows:
@Configuration(proxyBeanMethods = false)
// The RestTemplate class must exist in the project
@ConditionalOnClass(RestTemplate.class)
// There must be a LoadBalancerClient class Bean instance in the container
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
// Get all RestTemplate instances in the Spring container
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
/ / get the Spring container LoadBalancerRequestTransformer instance
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
/ / in the Bean will call afterSingletonsInstantiated method after initialization is complete
// Here is a lambda-style implementation that sets the RestTemplateCustomizer for the restTemplate instance
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for(RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); }}}); }/ / LoadBalancerRequestFactory factory class
/ / is mainly used to provide and LoadBalancerRequestTransformer LoadBalancerClient instance
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
LoadBalancerInterceptor LoadBalancerInterceptor
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
// Create an instance of the default LoadBalancerInterceptor
@Bean
public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
// The RestTemplateCustomizer instance will be created if there is no instance
// Add the loadBalancerInterceptor interceptor to all of our restTemplate instances
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = newArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; }}// ...
}
Copy the code
We can summarize the following code:
- If you want to use load balancing, you must have the RestTemplate class under your project, and then in the Spring container
LoadBalancerClient
The instance. LoadBalancerClient
在spring-cloud-netflix-ribbon
Has only one implementation class in:RibbonLoadBalancerClient
- Using Spring’s
SmartInitializingSingleton
Expansion point, inrestTemplateCustomizer()
Is all of themRestTemplateaddLoadBalancerInterceptor
The interceptor - The essence of a LoadBalancer is to pass an interceptor. using
RestTemplate
To achieve the extension pointRequest service load balancing.
LoadBalancerInterceptor
The LoadBalancerInterceptor passes the request to the LoadBalancerClient for processing. It first selects an implementation of ILoadBalancer to handle fetching and selecting services. Then the serviceName and load balancing algorithm are used to select the Server object. Finally, the request is executed.
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
// Load balancing
private LoadBalancerClient loadBalancer;
// Build the request
private LoadBalancerRequestFactory requestFactory;
// ...
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution)); }}Copy the code
RibbonLoadBalancerClient
We by tracking this. LoadBalancer. Find the execute code. All requests are processed by the RibbonLoadBalancerClient. It did. LoadBalancerClient interface, code as follows:
public interface ServiceInstanceChooser {
// Use serviceId to select a service instance
ServiceInstance choose(String serviceId);
}
public interface LoadBalancerClient extends ServiceInstanceChooser {
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest
request)
throws IOException;
// Replace service instance information with specific IP information
URI reconstructURI(ServiceInstance instance, URI original);
}
Copy the code
Firstly, we analyze the choose method of RibbonLoadBalancerClient
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, null);
}
// Select a specific service instance by service name
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
// Select a load balancer by service name. The default is' ZoneAwareLoadBalancer '
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
// Get the service
protected Server getServer(ILoadBalancer loadBalancer) {
return getServer(loadBalancer, null);
}
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
returnloadBalancer.chooseServer(hint ! =null ? hint : "default");
}
Copy the code
LoadBalancerInterceptor executes by directly delegating the loadbalancer.execute () method:
/ / LoadBalancerRequest by LoadBalancerRequestFactory. CreateRequest (request, body, execution) to create
// It implements the LoadBalancerRequest interface as an anonymous inner class with the generic type ClientHttpResponse
/ / because eventually perform obviously or actuators: ClientHttpRequestExecution. The execute ()
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
return execute(serviceId, request, null);
}
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
// Get the load balancer, then get a serverInstance instance
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) { // Throw an exception if not found. IllegalStateException is used here
throw new IllegalStateException("No instances available for " + serviceId);
}
// match the Server to RibbonServer isSecure: the client isSecure
Reference configuration file: / / serverIntrospector introspection ServerIntrospectorProperties
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));
// Call the overloaded interface method of this class
return execute(serviceId, ribbonServer, request);
}
// Its argument is ServiceInstance --> A unique Server instance has been identified
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
RibbonServer is the only implementation of execute
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
// The execution context is bound to serviceId
RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId); .// Actually send the request to the server and get the return value
/ / because there are interceptors, so here sure says executive is InterceptingRequestExecution the execute () method
/ / so will call ServiceRequestWrapper. GetURI (), which is called reconstructURI () method
T returnVal = request.apply(serviceInstance);
returnreturnVal; .// Exception handling
}
Copy the code
ReturnVal is a ClientHttpResponse, which is finally passed to the handleResponse() method to handle exceptions (if any), or to the extractor if none: ResponseExtractor. ExtractData (response) so that the entire request if all finished.
ZoneAwareLoadBalancer
The following figure shows the class diagram of ZoneAwareLoadBalancer. it
DynamicServerListLoadBalancer its parent class, core method
- Reset and initialize:
restOfInit(clientConfig)
- Update service list:
updateListOfServers();
The party needs to call to ServerList. GetUpdatedListOfServers () here is called to a specific registry implementation, In the case of Nacos, his implementation is NacosServerList#getUpdatedListOfServers(); - Update all service list:
updateAllServerList();
- Set all service lists
setServersList()
ZoneAwareLoadBalancer its core method:
- Select a service instance
chooseServer()
- Select a load balancer
getLoadBalancer
- Select service instances within the region:
zoneLoadBalancer.chooseServer
Ribbon summary
For the use of RestTemplate under @loadBalanced, I summarize as follows:
- The url passed in as String must be an absolute path (http://,…). Otherwise, an exception is thrown:
java.lang.IllegalArgumentException: URI is not absolute
- ServiceId is case insensitive (http://order-service/… The effect is the same as http://OERDER-SERVICE/…
- Do not use port after serviceId
Finally, it is important to note that: The @loadBalanced RestTemplate can only be a serviceId and cannot write an IP address/domain name to send requests. If both cases are required in your project, you need to define multiple RestTemplates for different scenarios
Nacos service query
Client query
If we use the default Nacos client, we go NacosServerList#getUpdatedListOfServers(); Interface to query the list of services.
public class NacosServerList extends AbstractServerList<NacosServer> {
private NacosDiscoveryProperties discoveryProperties;
@Override
public List<NacosServer> getUpdatedListOfServers(a) {
return getServers();
}
private List<NacosServer> getServers(a) {
try {
String group = discoveryProperties.getGroup();
// discoveryProperties.namingServiceInstance()
/ / in the end by reflecting access com. Alibaba. Nacos. Client. Naming. NacosNamingService instance
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId="+ serviceId, e); }}}Copy the code
The selectInstances method is then called
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// subscribe defaults to true
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
Copy the code
. In fact, the core logic in hostReactor getServiceInfo in query service information will convert the current serviceName, clusters to the key, Then use the getServiceInfo0 method to query the service information. In this case, local data is queried.
If null == serviceObj calls /instance/list in updateServiceNow to query service information
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
// UPDATE_HOLD_INTERVAL is set to constant by default
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
// The maximum wait time is 5s. After updating serviceObj, notifyAll() is executed.
// Method entry updateService(String serviceName, String Clusters)
// Maximum delay 2s DEFAULT_DELAY = 1
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:"+ clusters, e); }}}}// Update service information through Schedule
scheduleUpdateIfAbsent(serviceName, clusters);
// Get the latest value
return serviceInfoMap.get(serviceObj.getKey());
}
Copy the code
It’s easy to understand why the Ribbon calls slowly the first time because it initializes the service list and then queries the service instance through Nacos Client.
Server-side processing
The server processes the service instance query request through the /instance/list interface. First, the service instance information is stored in ConcurrentHashMap
/** * Map(namespace, Map(group::serviceName, Service)). */
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
Copy the code
In the process of our query is mainly managed through ServiceManager, the core entry method in InstanceController#doSrvIpxt
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
pushService
.addClient(namespaceId, serviceName, clusters, agent, newInetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); }}catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
checkIfDisabled(service);
List<Instance> srvedIPs;
// Query all services
// The list of services will be updated internally
// allInstances.addAll(persistentInstances);
// allInstances.addAll(ephemeralInstances);
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// filter ips using selector:
if(service.getSelector() ! =null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0"> =))0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL".false);
result.put("clusters", clusters);
result.put("env", env);
result.set("hosts", JacksonUtils.createEmptyArrayNode());
result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
if (isCheck) {
result.put("reachProtectThreshold".false);
}
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold".true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold".false);
return JacksonUtils.createEmptyJsonNode();
}
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
if(healthyOnly && ! entry.getKey()) {continue;
}
for (Instance instance : ips) {
// remove disabled instance:
if(! instance.isEnabled()) {continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0"> =))0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0"> =))0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL".false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
Copy the code
The core logic above is:
- Call the service.srvIPs method to query all service instances
Cluster#allIPs
All service registration information is written to the service registry.
Refer to the link
- nacos.io
- zhuanlan.zhihu.com
- Blog.csdn.net/f641385712/…