Nacos service registration and discovery blueprints
Why use service discovery?
Let’s say you’re writing some code that calls a service that has a REST API or Thrift API. To send a request, your code needs to know the network location (IP address and port) of the service instance. In traditional applications running on physical hardware, the network location of service instances is relatively static. For example, your code could read the network location from an occasionally updated configuration file. However, this is a much harder problem to solve in modern cloud-based microservices applications. Service instances have dynamically allocated network locations. In addition, the entire set of service instances changes dynamically due to automatic scaling, failures, and upgrades. Therefore, your client code needs to use a more precise service discovery mechanism. There are two main modes of service discovery: client-side discovery and server-side discovery. Let’s take a look at client discovery first.
What is the client discovery mode like?
When using the client discovery pattern, the client is responsible for determining the network location and request load balancing of available service instances. The client queries the Service Registry, which is a database of available service instances. The client then uses a load balancing algorithm to select an available service instance and make a request. The network location of the service instance is registered when the service registry is started. When the instance terminates, it is removed from the service registry. A heartbeat mechanism is typically used to periodically refresh the registration information of a service instance. Netflix OSS provides a good example of the client-side discovery pattern. Netflix Eureka is a service registry that provides a set of REST apis for managing service instance registrations and querying available instances. Netflix Ribbon is an IPC client that can be used with Eureka to load balance requests between available service instances. Eureka is discussed later in this chapter. The client-side discovery pattern has various advantages and disadvantages. This pattern is relatively simple, with no moving parts other than the service registry. In addition, because clients can discover available service instances, intelligent, application-specific load balancing decisions can be made, such as using consistent hashing. An important disadvantage of this pattern is that it couples the client to the service registry. You must implement client-side service discovery logic for each programming language and framework you use. Now that we’ve looked at client-side discovery, let’s look at server-side discovery.
Service discovery principle:
Service discovery is done by NacosWatch, which implements Spring’s Lifecycle interface and invokes the corresponding start () and stop () methods when a container is started and destroyed
Look at the source code
@ Override public void the start () {/ / cas set running state to true if (this.running.com pareAndSet (false, True)) {/ / delay a service discovery mission enclosing watchFuture = this. TaskScheduler. ScheduleWithFixedDelay (this: : nacosServicesWatch, this.properties.getWatchDelay()); }} @ Override public void the stop () {/ / set the status to false and then cancel the executing task if (this.running.com pareAndSet (true, false) && this.watchFuture ! = null) { this.watchFuture.cancel(true); } } public void nacosServicesWatch() { try { boolean changed = false; NamingService namingService = properties.namingServiceInstance(); / / for nacos server on the latest service providers ListView < String > ListView. = the properties namingServiceInstance () getServicesOfServer (1, Integer.MAX_VALUE); List<String> serviceList = listView.getData(); Set<String> currentServices = new HashSet<>(serviceList); currentServices.removeAll(cacheServices); if (currentServices.size() > 0) { changed = true; } // Unsubscribe services that have been taken offline, If (cacheServices. RemoveAll (new HashSet<>(serviceList)) && cacheServices. Size () > 0) {changed = true; for (String serviceName : cacheServices) { namingService.unsubscribe(serviceName, subscribeListeners.get(serviceName)); subscribeListeners.remove(serviceName); } } cacheServices = new HashSet<>(serviceList); For (String serviceName: cacheServices) {if (! subscribeListeners.containsKey(serviceName)) { EventListener eventListener = event -> NacosWatch.this.publisher .publishEvent(new HeartbeatEvent(NacosWatch.this, nacosWatchIndex.getAndIncrement())); subscribeListeners.put(serviceName, eventListener); namingService.subscribe(serviceName, eventListener); }} / / a publish event service change the if (changed) {this. Publisher. PublishEvent (new HeartbeatEvent (this, nacosWatchIndex.getAndIncrement())); } } catch (Exception e) { log.error("Error watching Nacos Service change", e); }}Copy the code
General process: The nacos client performs a delayed task of service subscription operation after the Spring container is started. When this task is executed, it first pulls the latest service list of the Nacos Server, and then compares it with the locally cached service list, unsubscribing the offline service. Then initiate a subscription operation to the nacOS Server to subscribe to all services
So how do service consumers perceive the status information of service providers in real time
1. After the subscription, the service consumer will perform a polling task (once every 1s) to pull the latest service provider information and update it in real time, which is implemented in HostReactor UpdateTask
public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private String clusters; private String serviceName; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } @override public void run() {try {// Obtain the current service information ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); // Pull the latest list of services for empty and then update it if (serviceObj == null) {updateServiceNow(serviceName, Clusters); // Continue polling executor.schedule(this, DEFAULT_DELAY, timeunit.milliseconds); return; } the if (serviceObj getLastRefTime () < = lastRefTime) {/ / update the current service for which the update operation updateServiceNow (serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } / / set up the service the latest update time lastRefTime = serviceObj. GetLastRefTime (); // Subscription cancelled if (! eventDispatcher.isSubscribed(serviceName, clusters) && ! futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task: NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } / / continue to the next polling executor. The schedule (this, serviceObj getCacheMillis (), TimeUnit. MILLISECONDS); } catch (Throwable e) { NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); }}}Copy the code
2. As mentioned above, when the service provider is registered, the NACOS server also has a corresponding heartbeat detection. When the heartbeat detection times out, that is, the heartbeat packet from the service provider is not received in time, the NACOS server determines that the service status is abnormal and then pushes the service information through UDP to inform the corresponding service consumers. Services to consumers through PushReceiver to handle udp protocol, HostReactor. ProcessServiceJson (String json) to update the local service list
/********************************PushReceiver*****************************/ public void run() { while (true) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); udpSocket.receive(packet); String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket pushPacket = JSON.parseObject(json, PushPacket.class); String ack; If (" dom ". The equals (pushPacket type) | | "service". The equals (pushPacket. Type)) {/ / handle change information hostReactor.processServiceJSON(pushPacket.data); // send ack to server ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } else if ("dump".equals(pushPacket.type)) { // dump data to server ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + "\"" + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap())) + "\"}"; } else { // do nothing send ack only ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")), ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress())); } catch (Exception e) { NAMING_LOGGER.error("[NA] error while receiving push data", e); }}}Copy the code
Service registration and subscription I only explained the main process, NACOS Server side to deal with the source code too much will not be posted, according to the corresponding API interface into a look will know, NACOS source code is easier to understand, there is nothing particularly difficult to read the place, here just to provide you a look at the source code ideas, Specific detailed flow still needs reader oneself to read closely
Practical steps:
1. Add dependencies
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> < version > 2.1.1. RELEASE < / version > < / dependency >Copy the code
2. Configure the service provider
spring.application.name=zuul-gateway
server.port=19997
spring.cloud.nacos.discovery.server-addr=localhost:8848
Copy the code
3. Note Enable service registration discovery
@EnableDiscoveryClient
public class ZuulApplication {
...
}
Copy the code