General documentation: Article directory Github: github.com/black-ant
A. The preface
Hydrology, hahaha hahaha ~~~~~~~~~~~~
Article purpose:
- Troubleshoot the Debug direction of the Nacos creation function
- Sort out the module system of Nacos components
Article outline: This document mainly involves the following main parts
- Nacos service discovery
- Nacos configuration loading
- Nacos health check
- Nacos Routing policy
PS: The reference source of the document isThe official documentation, it is recommended to read the documentation for quick use
2. Compilation of source code
2.1 Nacos source code compilation
Step 1: Download the source code
https://github.com/alibaba/nacos.git
// Step 2: Compile Nacos
mvn -Prelease-nacos -Dmaven.test.skip=true clean install -U
// Step 3: Run the Server file
nacos_code\distribution\target
Copy the code
2.2 Nacos source code running
Step 1: Download the Nacos source code
https://github.com/alibaba/nacos.git
// Step 2: Import Nacos from IDEAAdd SpringBoot here, starting with the com.alibaba.nacos.nacos class// Step 3: IDEA Modify startup parameters
-Dnacos.standalone=true -Dnacos.home=C:\\nacos
-nacos.standalone=true: single-server startup -dnacos. home=C:\ nacos: Log path// PS : Nacos Application
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {
public static void main(String[] args) { SpringApplication.run(Nacos.class, args); }}Copy the code
3. Module source code
<modules>
<! -- Configuration Management -->
<module>config</module>
<! -- Nacos kernel -->
<module>core</module>
<! -- Service Discovery -->
<module>naming</module>
<! -- address server -->
<module>address</module>
<! -- Unit test -->
<module>test</module>
<! -- Interface abstraction -->
<module>api</module>
<! -- Client -->
<module>client</module>
<! - case - >
<module>example</module>
<! -- Public Tools -->
<module>common</module>
<! -- Server build publishing -->
<module>distribution</module>
<! -- Console, Graphical Interface module -->
<module>console</module>
<! -- Metadata Management -->
<module>cmdb</module>
<! -- TODO: Integrated isTIO for flow control -->
<module>istio</module>
<! -- Consistency Management -->
<module>consistency</module>
<! -- Permission control -->
<module>auth</module>
<! -- system information management Env read, conf read -->
<module>sys</module>
</modules>
Copy the code
3.1 Nacos Service Discovery and Management (C0-C20)
Nacos management of services is mainly centered in the Naming module. Here, combining with the Client, we can see what logic is involved in service discovery and management, and how to operate them
3.1.1 Obtaining the Service list on the Console
External interface:
- C-catalogcontroller # listDetail: Obtain the list of service details
- C-catalogcontroller # instanceList: Lists instances of a special service
- C-catalogcontroller # serviceDetail: specifies service details
The core is mainly handled through the ServiceManager. Here’s a look at the internal logic:
Through the three interfaces, it is not difficult to find that the final calling core is the ServiceManager class/ / inner classes
C01- ServiceManager
/ / inner classes
PC- UpdatedServiceProcessor
PC- ServiceUpdater
PSC- ServiceChecksum
PC- EmptyServiceAutoClean
PC- ServiceReporter
PSC- ServiceKey :
// Core parameters
F- Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
F- LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
F- ConsistencyService consistencyService;
// Common methodsM- chooseServiceMap: obtain Server set by space name M- addUpdatedServiceToQueue: Update Server M- onChange: M -onDELETE: The server is deleted// In addition, the ServiceManager has a dependency: ConsistencyServiceC02- ConsistencyService : M-remove: deletes data from a cluster. M-get: obtains data from a cluster. M-listen: listens for changes in a key in a cluster. Deleting the listener for a key m-isavailable: returns whether the current consistency status isAvailableCURD = CURD; CURD = CURD; CURD = CURD; CURD = CURD;
1To Delete, will be called dependent objects ConsistencyService (DelegateConsistencyServiceImpl), used for consistency processing requirementsCopy the code
PS:C02_01 ConsistencyService architecture
When multiple services exist, how are they stored?
// As shown in the figure above, the related objects are stored in the clusterMap
// Note that there are two services
com.alibaba.nacos.api.naming.pojo.Service
com.alibaba.nacos.naming.core.Service
// Properties of the Service
C- Service
I- com.alibaba.nacos.api.naming.pojo.Service
F- Selector selector
F- Map<String, Cluster> clusterMap = new HashMap<>();
F- Boolean enabled
F- Boolean resetWeight
F- String token
F- List<String> owners
Copy the code
3.1.2 Control classes for Nacos services and Config
In Nacos, NamingService and ConfigService are used to control the service and configuration. The underlying principle is as follows: NacosConfigService -> NacosNamingService -> NacosNamingService2A class belongs to com. Alibaba. Nacos. Client. The naming package// PS: Note that the nacos-config-spring-boot-starter and nacos-discovery-spring-boot-starter packages are used
Copy the code
3.1.3 Exit destruction of Nacos
Nacos will log out of the Server at the end of the life cycle when the service we registered is shut down
Step 1: Closeable stops the related class and stops the project. When we click stop, we can see the following list of logs
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.beat.BeatReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.HostReactor do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.HostReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.net.NamingProxy do shutdown begin
com.alibaba.nacos.client.naming : [NamingHttpClientManager] Start destroying NacosRestTemplate
com.alibaba.nacos.client.naming : [NamingHttpClientManager] Destruction of the end
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.net.NamingProxy doShutdown stop here as you can see, here are the closing operation, the main Close operation is based on com.alibaba.nacos.com mon. Lifecycle. Closeable for implementation// PS: the call logic here is TODO
Copy the code
Step 2: NacosServiceRegistry logout
In addition to Closeable being closed, the Service is also logged out, primarily NacosServiceRegistry
public void deregister(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
try {
namingService.deregisterInstance(serviceId, group, registration.getHost(),
registration.getPort(), nacosDiscoveryProperties.getClusterName());
} catch (Exception e) {
/ / omit the log}}// PS: we inherit ServiceRegistry to implement the destruction logic
C- AbstractAutoServiceRegistration
Copy the code
Nacos_Closeable system
3.1.4 Health Check Process
Nacos provides real-time health checks on services and prevents requests from being sent to unhealthy hosts or service instances. Nacos supports health checks at the transport layer (PING or TCP) and at the application layer (such as HTTP, MySQL, user defined).
Health check related interfaces
- Send instance heartbeat (InstanceController) : / nacos/v1 / ns/instance/beat
- Update the instance of health status (HealthController) : / nacos/v1 / ns/health/instance
The Client initiates a heartbeat. Procedure
The server will periodically initiate heartbeat operations to invoke two interfaces:
C-beatreactor PC-Beattask: inner class// There are two steps:
// Step 1: BeatReactor # addBeatInfo Add a scheduled task
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if((existBeat = dom2Beat.remove(key)) ! =null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// Add the heartbeat information
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
// Step 2: Call the Server interface in BeatTask to perform heartbeat operations
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
/ / PS: heartbeat interval default is 5 seconds (com.alibaba.nacos.api.com mon. Constants)
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
Copy the code
The server detects the heartbeat
// The server will also check the instance. The core class is ClientBeatCheckTask
// Step 1: Create ClientBeatCheckTaskC- Service ? - Task is started when Service is initializedpublic void init(a) {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
/ /...
}
// You can also see the default time Settings
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000.5000, TimeUnit.MILLISECONDS));
}
// Step 2: Run ClientBeatCheckTaskC- ClientBeatCheckTask ? - Check and update the status of temporary instances, and delete them if they have expired.public void run(a) {
/ /... omit
// Step 1: Get all instances
List<Instance> instances = service.allIPs(true);
for (Instance instance : instances) {
// If the time is longer than the heartbeat timeout, change the health status
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if(! instance.isMarked()) {if (instance.isHealthy()) {
instance.setHealthy(false);
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); }}}}if(! getGlobalConfig().isExpireInstance()) {return;
}
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// If the heartbeat is longer than the deletion time, delete the instance
if(System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { deleteIp(instance); }}}// PS: the first time the health status is changed, and the number of instances is changed later
Copy the code
ACK Health check main logic and push information
I stumbled upon an ACK mechanism that starts with events. This mode is designed to push updates to the Server through udpClient, using UDP port, when the Server changes
PS: If the client provides a UDP port when querying the service instance, the server will create a udpClient
for (Instance instance : service.allIPs(Lists.newArrayList(clusterName))) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
instance.setHealthy(valid);
// Publish event ServiceChangeEvent
pushService.serviceChanged(service);
break; }}// ServiceChangeEvent Event handling
C- PushService
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
// Get PushClient collection from ServerName and namespace
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
// Loop through all pushClients
for (PushClient client : clients.values()) {
if (client.zombie()) {
clients.remove(client.toString());
continue;
}
Receiver.AckEntry ackEntry;
// Get the cache key and get the entity data from the cache
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
}
// Build the ACK entity class
if(compressData ! =null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if(ackEntry ! =null) {
cache.put(key, neworg.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); }}// Verify the UDP ACK and push the ACK entityudpPush(ackEntry); }}catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally{ futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); }},1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
return null;
}
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if(! ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());// Socket Send
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null; }}Copy the code
Usage of the health check threshold
In the Service configuration, you can configure a floating point number from 0 to 1, defines the threshold of health examination, the threshold value corresponding to the class for the com. Alibaba. Nacos. API. Naming. Pojo. Service
C-service f-name: Service name f-protectthreshold: health threshold f-appname: application name f-groupname: groupName f-metadata: metadata// Usage of the threshold
C- InstanceController
M- doSrvIpxt :
// Core logic
double threshold = service.getProtectThreshold();
// If the ratio of the number of available health instances to the total number of services in IPMap is lower than the threshold, the protection threshold is reached
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
if (isCheck) {
result.put("reachProtectThreshold".true); } ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } PS: If the Client Balancer is a valid instance of the Server, the Client Balancer is a valid instance of the ServerCopy the code
But what is the purpose of a threshold?
If a large number of exceptions occur in an instance, then the stress will eventually be on a few healthy instances, which can lead to a chain reaction
To avoid these conditions, all instances are returned when the health threshold is reached.
** ipmap.get (boolea.true).addall (ipmap.get (boolea.false)); The purpose of the war.
Although the Client may encounter an exception instance, it can prevent the entire system from crashing
Nacos/v1 / ns/health/instance parameters
The name of the | type | Whether the choice | describe |
---|---|---|---|
namespaceId | string | no | Namespace ID |
serviceName | string | is | The service name |
groupName | string | no | Group name |
clusterName | string | no | The cluster name |
ip | string | is | Service Instance IP |
port | int | is | Service Instance Port |
healthy | boolean | is | Whether health |
Treatment of weight
Weights are mainly configured in the Instance object
C- Instance
M- instanceId
M- ip
M- port
M- weight : 权重
M- healthy : 健康情况
M- enabled
M- ephemeral
M- clusterName
M- serviceName
M- metadata
Copy the code
PS: Weight can be allocated when the weight is used on the Client
Refer to the original @ blog.csdn.net/krisdad/art…
public class NacosWeightLoadBalanceRule extends AbstractLoadBalancerRule {
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {}
@Resource private NacosDiscoveryProperties nacosDiscoveryProperties;
@Override
public Server choose(Object key) {
// 1. Obtain the name of the service
BaseLoadBalancer loadBalancer = (BaseLoadBalancer) this.getLoadBalancer();
String serverName = loadBalancer.getName();
// 2. In this case, the Nacos Client will automatically implement the load balancing algorithm based on the weight
NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
try {
Instance instance = namingService.selectOneHealthyInstance(serverName);
return new NacosServer(instance);
} catch (NacosException e) {
e.printStackTrace();
}
return null;
}
@Bean
public IRule getLoadBalancerRule(a){
return new NacosWeightLoadBalancerRule();
}
// PS: I think the weight is given to the Client to handle itself
Copy the code
Other points
// The default value of Nacos
@Value("${nacos.naming.empty-service.auto-clean:false}")
private boolean emptyServiceAutoClean;
@Value("${nacos.naming.empty-service.clean.initial-delay-ms:60000}")
private int cleanEmptyServiceDelay;
@Value("${nacos.naming.empty-service.clean.period-time-ms:20000}")
private int cleanEmptyServicePeriod;
Copy the code
3.2 Nacos configuration process C30-C60
3.2.1 Nacos configuration management
// To get the configuration, there are several main steps:
C30- NacosConfigService
M30_01- getConfigInner(String tenant, String dataId, String group, longTimeoutMs) - Build ConfigResponse and set dataId, tenant, group for it1- call LocalConfigInfoProcessor. GetFailover prefer using local configuration2- Call ClientWorker to get remote configuration -> PS:M30_01_013- there is still no, LocalConfigInfoProcesso getSnapshot snapshots End - configFilterChainManager Filter chain processing// PS:M30_01_01 ClientWorker processingA remote service request is made in a ClientWorker. The core code is agent.httpget (constants.config_controller_Path,null, params, agent.getEncode(), readTimeout);
// As you can see, there is no overloading logic here, it is still a Rest request: PS look at the official website message, 2.0 will use a long link, this should change
- Constants.CONFIG_CONTROLLER_PATH : /v1/cs/configs
Copy the code
LocalConfigInfoProcessor Specifies the main logic
// Local configuration refers to the local File File, here is how to use the source code:C31- LocalConfigInfoProcessor M31_01- getFailover - Obtain localPath -> PS:M31_01_01 M32_02- saveSnapshot: Will the snapshot be saved after the snapshot is successfully obtained? - Save path: omit \nacos\config\fixed-127.0. 0.1_8848_nacos\snapshot\one1\test1
// PS:M31_01_01 localPath
C:\Users\10169\nacos\config\fixed-127.0. 0.1_8848_nacos\data\config-data\one1\test1
Copy the code
Pro 1: What can we see from this source code?
Since local files exist, does that mean I can use the local configuration first by changing this path, < nacos > config > fixed-127.0.1_8848_nacos > data > config-data > one1 > test1 < nacos > data > one1 > test1 < nacos > data > one1 > test1 < nacos > config > fixed-127.0.1_8848_nacos > data > one1 > test1
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * *true
Copy the code
Use of Pro 2: Filter
You can see above, the process of configuration, has a default Filter processing configFilterChainManager. DoFilter (null, cr);
// With this logic, more configurations are possibleC32- ConfigFilterChainManager ? - You can customize Filter M32_01- addFilter M32_02- doFilterCopy the code
TODO: How to insert Filter here need to be improved, no interface to add Filter, strange….
3.2.2 Dr Processing configured with Nacos
Nacos LocalConfigInfoProcessor provides disaster recovery (Dr) functions in two modes: local configuration and snapshot processing
Local configuration
As stated above, the specified path can be changed to achieve
Configure the snapshot
The Nacos client SDK generates a snapshot of the configuration locally. When a client cannot connect to Nacos Server, a configuration snapshot can be used to show the overall disaster recovery capability of the system. Configuration snapshots are similar to local commit in Git and are similar to caches in that they are updated when appropriate, but there is no concept of cache expiration.
3.2.3 Nacos dynamic configuration processing
Dynamic configuration mainly refers to the monitoring of configuration changes:
Nacos uses long polling to detect configuration changes. The corresponding core class is LongPollingRunnable # checkUpdateDataIds. Check >>>> for debugging
class LongPollingRunnable implements Runnable {
private final int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run(a) {
/ /... The core statementList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); }}List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if(! cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// It updates when cacheData occours in cacheMap by first time.inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); }}}booleanisInitializingCacheList = ! inInitializingCacheList.isEmpty();return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
Map<String, String> params = new HashMap<String, String>(2);
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap<String, String>(2);
// Long polling
headers.put("Long-Pulling-Timeout"."" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put("Long-Pulling-Timeout-No-Hangup"."true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
// /v1/cs/configs/listener
HttpRestResult<String> result = agent
.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
readTimeoutMs);
if (result.ok()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData());
} else {
setHealthServer(false); }}catch (Exception e) {
setHealthServer(false);
throw e;
}
return Collections.emptyList();
}
// The corresponding Controller is ConfigController
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
/ /...
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
// The corresponding polling interface
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
// Long polling.
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// Compatible with short polling logic.
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
Loggers.AUTH.info("new content:" + newResult);
// Disable cache.
response.setHeader("Pragma"."no-cache");
response.setDateHeader("Expires".0);
response.setHeader("Cache-Control"."no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
// LongPollingServiceFor more details, please see this HTTPS://www.jianshu.com/p/acb9b1093a54
Copy the code
3.2.4 Nacos metadata
Let’s see, what is the metadata of Nacos?
Nacos Data (such as configuration and service) description information, such as service version, weight, disaster recovery policy, load balancing policy, authentication configuration, and various user-defined labels. In terms of scope, it can be classified into service level meta information, cluster meta information, and instance meta information.
// Method 1: Configure this parameter when configuring services
spring:
application:
name: nacos-config-server
cloud:
nacos:
discovery:
server-addr: 127.0. 01.:8848
metadata:
version: v1
// Method 2: Configure the page directly
// Method 3: API call, using Delete, Update and other requests to determine the typeBatch update instance metadata (InstanceController) : / nacos/v1 / ns/instance/metadata/batch bulk delete instance metadata (InstanceController) : /nacos/v1/ns/instance/metadata/batchCopy the code
3.3 Nacos load balancing
Load balancing of Nacos belongs to dynamic DNS service
Dynamic DNS services support weighted routing, enabling you to implement load balancing at the middle layer, flexible routing policies, traffic control, and simple DNS resolution services on the data center Intranet. Dynamic DNS services also make it easier for you to implement DNS protocol-based service discovery to help you eliminate the risk of coupling to vendor-private service discovery apis.
Based on the relevant knowledge of Feign, we know that the Balance is processed in the BaseLoadBalancer combined with the Rule. Here we will analyze the structure through which the two are processed
Step 1: Feign calls Nacos
We start with BaseLoadBalancer, conduct Debug processing, and go to the point PredicateBasedRule
C- PredicateBasedRule M- choose(Object key) - Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); ? - here you can see, one of the lb. GetAllServers operation, the lb for DynamicServerListLoadBalancer here// PS: getAllServers, you can see that all the Servers are in the List
public List<Server> getAllServers(a) {
return Collections.unmodifiableList(allServerList);
}
// Trace the logic that was put in, starting with the loading of the ILoadBalancer Bean, and calling chain as follows:C - RibbonClientConfiguration # ribbonLoadBalancer: build a ILoadBalancer C - ZoneAwareLoadBalancer: Enter the ZoneAwareLoadBalancer constructor C - DynamicServerListLoadBalancer: Enter the constructor C - DynamicServerListLoadBalancer # restOfInit: Init operation C - DynamicServerListLoadBalancer # updateListOfServers: Update the Server List main process, find the first Server List here, following the Debug C - the first node DynamicServerListLoadBalancer # updateAllServerList: SettingsServerList
public void updateListOfServers(a) {
List<T> servers = new ArrayList<T>();
if(serverListImpl ! =null) {
servers = serverListImpl.getUpdatedListOfServers();
if(filter ! =null) { servers = filter.getFilteredListOfServers(servers); } } updateAllServerList(servers); } as you can see, there is2A Server logic method, here have a look at the family system, is clear about serverListImpl. GetUpdatedListOfServers (); filter.getFilteredListOfServers(servers);// There is an implementation class, NacosServerList, which obtains the list of services from Nacos
private List<NacosServer> getServers(a) {
try {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
} catch (Exception e) {
throw new IllegalStateException(....);
}
}
// Load balancing policyThe load balancing policy is based on BalanceCopy the code
3.4 Cluster processing
Nacos cluster
Nacos cluster is relatively easy to use. You only need to configure the corresponding service information in /conf/cluster.conf >>>>
#it is ip
#example
127.0. 01.:8848
127.0. 01.:8849
127.0. 01.:8850
Copy the code
Nacos cluster source tracking
Looking at the source level, how does this logic work?
Core processing class in com. Alibaba. Nacos. Core. The cluster
// Step 1: Obtain the configuration method
C- EnvUtil
public static String getClusterConfFilePath(a) {
return Paths.get(getNacosHome(), "conf"."cluster.conf").toString();
}
// Read the Cluster configuration
public static List<String> readClusterConf(a) throws IOException {
try (Reader reader = new InputStreamReader(new FileInputStream(new File(getClusterConfFilePath())),
StandardCharsets.UTF_8)) {
return analyzeClusterConf(reader);
} catch (FileNotFoundException ignore) {
List<String> tmp = new ArrayList<>();
String clusters = EnvUtil.getMemberList();
if (StringUtils.isNotBlank(clusters)) {
String[] details = clusters.split(",");
for(String item : details) { tmp.add(item.trim()); }}returntmp; }}// Step 2: Using a Cluster
AbstractMemberLookup
// The main usage is concentrated in the ServerManagerC- ServerManager F- ServerMemberManager memberManager; C-servermembermanager: Cluster node management in Nacos m-init: cluster node manager initialization m-getSelf: Obtain local node information m-getMemberAddressInfo: M-update: updates information about the target node. M-isunhealth: indicates whether the target node is healthy. M-initandstartlookup: initializes the addressing mode// TODO: other methods are omitted. Later, we will conduct related performance analysis. The source code combing of the cluster is expected to be put in that part of the analysis
Copy the code
conclusion
This article is a more application oriented article, the source code depth is less, the more important reason is because Nacos source layer is clearer, the structure is clear, do not need to load the depth.
In addition, Nacos 2.0 is also being released, see the documentation using Socket long connection, if you have a chance, then compare the difference between the two to see.
The appendix
Appendix 1: Manually call Server
package com.alibaba.nacos.discovery.service;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Cluster;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.healthcheck.AbstractHealthChecker;
import netscape.javascript.JSObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/ * * *@Classname NacosClientService
* @Description TODO
* @Date 2021/5/26
* @Created by zengzg
*/
@Component
public class NacosClientNodesService implements ApplicationRunner {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private NamingService namingService;
@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;
@Override
public void run(ApplicationArguments args) throws Exception {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
namingService = NacosFactory.createNamingService(properties);
}
/** * get Nacos Config * parameter format * {* "instanceId": "192.168.0.97#9083#DEFAULT#DEFAULT_GROUP@@nacos-user-server", * "IP ": "192.168.0.97", * "port": 9083, * "weight": 1, the server * "healthy" can be determined by the weight: true, * "enabled": true, * "ephemeral": true, * "clusterName": "DEFAULT", * "serviceName": "DEFAULT_GROUP@@nacos-user-server",
* "metadata": {
* "preserved.register.source": "SPRING_CLOUD"
* },
* "ipDeleteTimeout": 30000,
* "instanceHeartBeatInterval": 5000,
* "instanceHeartBeatTimeOut": 15000
* }
*
* @param serviceName
* @return* /
public List<Instance> get(String serviceName) {
List<Instance> content = new LinkedList<Instance>();
try {
content = namingService.getAllInstances(serviceName);
logger.info(Config serviceName [{}] <-------", serviceName);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
return content;
}
/** * create Nacos Config **@param serviceName
* @param ip
* @param port
*/
public void createOrUpdate(String serviceName, String ip, Integer port) {
try {
logger.info("-- -- -- -- -- - > create Config GroupID / {} - DataID [{}] Success, The value: [{}] < -- -- -- -- -- -- --", serviceName, ip, port);
namingService.registerInstance(serviceName, ip, port, "TEST1");
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage()); e.printStackTrace(); }}/** * remove Nacos Config **@param serviceName
* @param ip
*/
public void delete(String serviceName, String ip, Integer port) {
try {
namingService.deregisterInstance(serviceName, ip, port, "DEFAULT");
logger.info("-- -- -- -- -- - > delete the Config GroupID / {} - DataID [{}] Success < -- -- -- -- -- -- --", serviceName, ip);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage()); e.printStackTrace(); }}}Copy the code
Appendix 2: Call Config manually
public class NacosClientConfigService implements ApplicationRunner {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private ConfigService configService;
@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;
@Override
public void run(ApplicationArguments args) throws Exception {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
configService = NacosFactory.createConfigService(properties);
}
/**
* 获取 Nacos Config
*
* @param dataId
* @param groupId
* @return* /
public String get(String dataId, String groupId) {
String content = "";
try {
content = configService.getConfig(dataId, groupId, 5000);
logger.info("-- -- -- -- -- - > get The Config GroupID / {} - DataID [{}] Success, The value: [{}] < -- -- -- -- -- -- --", dataId, groupId, content);
configService.addListener(dataId, groupId, new ConfigListener());
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
return content;
}
/** * create Nacos Config **@param dataId
* @param groupId
* @param content
*/
public void createOrUpdate(String dataId, String groupId, String content) {
try {
logger.info("-- -- -- -- -- - > create Config GroupID / {} - DataID [{}] Success, The value: [{}] < -- -- -- -- -- -- --", dataId, groupId, content);
configService.publishConfig(dataId, groupId, content);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage()); e.printStackTrace(); }}/** * remove Nacos Config **@param dataId
* @param groupId
*/
public void delete(String dataId, String groupId) {
try {
configService.removeConfig(dataId, groupId);
logger.info("-- -- -- -- -- - > delete the Config GroupID / {} - DataID [{}] Success < -- -- -- -- -- -- --", dataId, groupId);
configService.removeListener(dataId, groupId, null);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage()); e.printStackTrace(); }}}Copy the code
Appendix III: Using NacosInjected
passable
<! <groupId>com.alibaba.boot</groupId> <artifactId>nacos-config-spring-boot-starter</artifactId> <version>0.27.</version>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-discovery-spring-boot-starter</artifactId>
<version>0.27.</version>
</dependency>
@NacosInjected
private ConfigService configService;
@NacosInjected
private NamingService namingService;
Copy the code
Appendix 4: Naocs Official Structure Diagram (Transport)
Here is pure handling, you can see the official document @nacos. IO /zh-cn/docs/…
Function:
- Service management: Implement service CRUD, domain name CRUD, service health status check, and service weight management
- Configuration management: realize CRUD of configuration management, version management, gray management, monitoring management, push trajectory, aggregated data and other functions
- Metadata management: Provides metadata CURD and marking capabilities
- Plug-in mechanism: the three modules can be divided and combined, and the SPI mechanism of extension point is realized
- Event mechanism: realize asynchronous event notification, SDK data change asynchronous notification and other logic
- Log module: manages log classification, log level, log portability (especially to avoid conflicts), log format, exception code + help document
- Callback mechanism: THE SDK notifies the data and calls back the user for processing in a unified mode. Interfaces and data structures need to be scalable
- Addressing mode: Address IP, domain name, Nameserver, broadcast and other addressing modes, need to be extensible
- Push channel: Addresses performance push issues between servers and storage devices, between servers, and between servers and SDKS
- Capacity management: Manages the capacity of each tenant and group to prevent storage from being overwritten and affecting service availability
- Traffic management: Controls the request frequency, number of long links, packet size, and request flow control by tenant and group
- Cache mechanism: Dr Directory, local cache, server cache mechanism. Tools are required to use the Dr Directory
- Startup mode: Start different programs and UIs in single-machine deployment mode, configuration mode, service mode, DNS mode, or all mode
- Consistency protocol: Different consistency mechanisms for different data and different consistency requirements
- Storage module: solves persistent and non-persistent data storage and data fragmentation
- Nameserver: Solves the problem of routing a Namespace to a ClusterID and the problem of mapping the user environment to the NACOS physical environment
- CMDB: Solve metadata storage, interconnection problems with the third-party CMDB system, and solve the relationship between applications, people, and resources
- Metrics: Expose standard Metrics data to facilitate communication with the three-party monitoring system
- Trace: Expose standard Trace to facilitate communication with SLA system, log whitening, push Trace and other capabilities, as well as the metering and billing system
- Access management: it is equivalent to the process of opening aliyun services and assigning identity, capacity and permission
- User management: solve user management, login, SSO and other issues
- Permission management: solve identity, access control, role management and other issues
- Audit system: The extended interface is convenient to connect with different audit systems of companies
- Notification system: Core data change, or operation, easy to get through through SMS system, notify the corresponding person of data change
- OpenAPI: Exposes a standard REST-style HTTP interface that is easy to use and facilitates multi-language integration
- Console: easy-to-use Console for service management and configuration management
- SDK: Multilingual SDK
- Agent: this mode is similar to DNS -f, or it can be integrated with mesh solutions
- CLI: Command line for lightweight management of products, like Git easy to use
Domain model:
The SDK class diagram: