Learning is not so utilitarian, two brothers with you from a higher dimension of easy to read source ~
Speaking of Nacos service subscription mechanism, it may feel very mysterious to those who do not know, this article will give you a deep understanding of Nacos 2.0 client subscription implementation. Due to the more involved content, it is divided into several, this is the first.
Nacos subscription overview
The Nacos subscription mechanism, if described in one sentence, is that the Nacos client obtains the list of instances from the registry every 6 seconds through a scheduled task. When the instance changes are discovered, the change event is published and the subscriber performs business processing. The update instance of the update instance, the update local cache of the update local cache.
The above picture shows the main flow of the subscription method, which involves a lot of content and complex processing details. You just need to get to the core here. Here’s a step-by-step analysis of the process through code and flow charts.
From subscription to scheduled task
The subscription mechanism we’re talking about here is essentially a quasi-real-time awareness of service discovery. As you have seen above, when the subscription method is executed, a scheduled task is triggered to periodically pull data from the server. So, in essence, the subscription mechanism is a way to implement service discovery, as opposed to a direct query interface.
NacosNamingService exposes many subscribe overloads. The purpose of overloading is to make you write fewer parameters. These parameters are handled by default by Nacos. Eventually these overloaded methods will all call the following method:
// NacosNamingService
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
Copy the code
Methods we temporarily don’t chat and the event listeners directly see the subscribe method, here for NamingClientProxyDelegate clientProxy type. This class is instantiated when NacosNamingService is instantiated, as described in the previous section.
While clientProxy. The subscribe method is realized in NamingClientProxyDelegate:
// NamingClientProxyDelegate @Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); / / get in the cache ServiceInfo ServiceInfo result = serviceInfoHolder. GetServiceInfoMap () get (the serviceKey); If (null == result) {if (null == result) { Based on the result = grpcClientProxy gRPC agreement. The subscribe (serviceName, groupName, clusters); } / / timing scheduling UpdateTask serviceInfoUpdateService. ScheduleUpdateIfAbsent (serviceName, groupName, clusters); . / / ServiceInfo local cache handling serviceInfoHolder processServiceInfo (result); return result; }Copy the code
Does this look familiar? Yes, we covered this earlier when we analyzed Nacos Client Service Discovery. It seems that all paths lead to the same thing. Querying the list of services and subscribing end up calling the same method.
We talked about other processes in the last section, but here we focus on task scheduling:
// ServiceInfoUpdateService public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) { String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); if (futureMap.get(serviceKey) ! = null) { return; } synchronized (futureMap) { if (futureMap.get(serviceKey) ! = null) { return; } // Create UpdateTask ScheduledFuture<? > future = addTask(new UpdateTask(serviceName, groupName, clusters)); futureMap.put(serviceKey, future); }}Copy the code
This method consists of building the serviceKey, rejudging through the serviceKey, and adding the UpdateTask.
The implementation of addTask is to initiate a scheduled task:
// ServiceInfoUpdateService private synchronized ScheduledFuture<? > addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); }Copy the code
The scheduled task is executed one second later.
This is the end of the track. There are only two core functions: calling subscription methods and initiating scheduled tasks.
What the scheduled task did
UpdateTask encapsulates the core business logic of the subscription mechanism, so let’s start with a flowchart to see what we’ve done.
With the above flowchart, it’s pretty clear what UpdateTask does. Post all code for the run method directly:
public void run() { long delayTime = DEFAULT_DELAY; Try {// Check whether the registered Service is subscribed. If not, no more if (! changeNotifier.isSubscribed(groupName, serviceName, clusters) && ! futureMap.containsKey(serviceKey)) { NAMING_LOGGER .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters); return; } / / the cache service information ServiceInfo serviceObj = serviceInfoHolder. GetServiceInfoMap () get (the serviceKey); If (serviceObj == null) {// Obtain Service information from the registry server based on serviceName. ServiceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); serviceInfoHolder.processServiceInfo(serviceObj); lastRefTime = serviceObj.getLastRefTime(); return; } // Expired service (the last update time of the service is less than or equal to the cache refresh time), From the registry to query the if (serviceObj getLastRefTime () < = lastRefTime) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); . / processing/Service message serviceInfoHolder processServiceInfo (serviceObj); } / / refresh update time lastRefTime = serviceObj. GetLastRefTime (); if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } // Next update cache time set, Defaults to 6 seconds / / TODO multiple time can be configured. The delayTime = serviceObj. GetCacheMillis () * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; ResetFailCount (); resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e); } finally {// Update time of the next schedule. FailCount is related to failCount. // failCount=0, then the next schedule is 6 seconds. Executor. Schedule (this, math.min (delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); }}Copy the code
The first way to determine whether a service has been subscribed is by using ChangeNotifier#isSubscribed:
public boolean isSubscribed(String groupName, String serviceName, String clusters) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
return CollectionUtils.isNotEmpty(eventListeners);
}
Copy the code
If you look at the source code for this method, you’ll see that listenerMap is the same EventListener registered by registerListener in the original subscribe method.
After the run method, the service processing is basically the same. First, check whether the cache contains ServiceInfo information. If no, query the registry, process ServiceInfo, and update the last processing time.
Compare last Update time with last Update time in the current ServiceInfo to determine whether ServiceInfo is invalid. If it fails, the registry is queried, ServiceInfo is processed, and the last processing time is updated.
The service logic finally calculates the execution time of the next scheduled task and uses delayTime to delay the execution. The default delayTime is 1000L * 6, which is 6 seconds. In finally, the next scheduled task is actually initiated. When an exception occurs, the next execution time depends on the number of failures, but the maximum time is less than one minute.
summary
In this article, we talk about the source code of Nacos client service subscription mechanism. The main steps are as follows:
Step 1: Subscribe to the method call and register an EventListener, after which UpdateTask is used to determine;
Step 2: Handle the subscription logic through the proxy class, using the same method as the instance list method;
Step 3: Execute the UpdateTask method using the scheduled task. The default interval is 6 seconds. If an exception occurs, the interval will be longer than 1 minute.
Step 4: The UpdateTask method will compare whether the local cache exists and whether the cache has expired. If it does not exist or has expired, query the registry to obtain the latest instance, update the last obtaining time, and process ServiceInfo.
Step 5: Recalculate the scheduled task time and execute the above process in a loop.
In the next article, we’ll build on this by showing how to process the instance information obtained in the ServiceInfoHolder#processServiceInfo method.
Author of SpringBoot Tech Insider, loves to delve into technology and write about it.
Official account: “Program New Horizon”, the official account of the blogger, welcome to follow ~
Technical exchange: please contact the blogger wechat id: Zhuan2quan