sequence
This article focuses on service Porter of NACOS
ServiceManager.init
Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/core/ServiceManager. Java
@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
/**
* Map<namespace, Map<group::serviceName, Service>>
*/
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new ServiceStatusSynchronizer();
private final Lock lock = new ReentrantLock();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
@Autowired
private PushService pushService;
private final Object putServiceLock = new Object();
@PostConstruct
public void init() {
UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());
try {
Loggers.SRV_LOG.info("listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen for service meta change failed!"); }} / /... }Copy the code
- The ServiceManager init method registers the ServiceReporter to utilsandCommons.service_synchronization_executor
ServiceReporter
Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/core/ServiceManager. Java
private class ServiceReporter implements Runnable {
@Override
public void run() {
try {
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
for (String serviceName : allServiceNames.get(namespaceId)) {
if(! distroMapper.responsible(serviceName)) {continue;
}
Service service = getService(namespaceId, serviceName);
if (service == null) {
continue;
}
service.recalculateChecksum();
checksum.addItem(serviceName, service.getChecksum());
}
Message msg = new Message();
msg.setData(JSON.toJSONString(checksum));
List<Server> sameSiteServers = serverListManager.getServers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
for (Server server : sameSiteServers) {
if (server.getKey().equals(NetUtils.localServer())) {
continue;
}
synchronizer.send(server.getKey(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e); } finally { UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS); }}}Copy the code
- ServiceReporter implements the Runnable interface. Its run method iterates through allServiceNames, retrievaldistRomapper. Responsible serviceName, recalculateChecksum, Then add it to the ServiceChecksum, construct Message, iterate over sameSiteServers and send the Message using synchronizer. Send. Finally, re-register ServiceReporter to utilsandCommons.service_synchronization_executor
ServiceStatusSynchronizer
Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/misc/ServiceStatusSynchronizer Java
public class ServiceStatusSynchronizer implements Synchronizer {
@Override
public void send(final String serverIP, Message msg) {
if(serverIP == null) {
return;
}
Map<String,String> params = new HashMap<String, String>(10);
params.put("statuses", msg.getData());
params.put("clientIP", NetUtils.localServer());
String url = "http://" + serverIP + ":" + RunningConfig.getServerPort() + RunningConfig.getContextPath() +
UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {
url = "http://" + serverIP + RunningConfig.getContextPath() +
UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
}
try {
HttpClient.asyncHttpPostLarge(url, null, JSON.toJSONString(params), new AsyncCompletionHandler() {
@Override
public Integer onCompleted(Response response) throws Exception {
if(response.getStatusCode() ! = HttpURLConnection.HTTP_OK) { Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}", serverIP);
return 1;
}
return0; }}); } catch (Exception e) { Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
}
}
@Override
public Message get(String serverIP, String key) {
if(serverIP == null) {
return null;
}
Map<String,String> params = new HashMap<>(10);
params.put("key", key);
String result;
try {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[STATUS-SYNCHRONIZE] sync service status from: {}, service: {}", serverIP, key);
}
result = NamingProxy.reqAPI(RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/" + "statuses", params, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] Failed to get service status from " + serverIP, e);
return null;
}
if(result == null || result.equals(StringUtils.EMPTY)) {
return null;
}
Message msg = new Message();
msg.setData(result);
returnmsg; }}Copy the code
- ServiceStatusSynchronizer implements the Synchronizer interface, its the send method will be executed asynchronously post request, shall notify to the target server statuses
summary
ServiceReporter implements the Runnable interface. Its run method iterates through allServiceNames, retrievaldistRomapper. Responsible serviceName, recalculateChecksum, Then add it to the ServiceChecksum, construct Message, iterate over sameSiteServers and send the Message using synchronizer. Send. Finally, re-register ServiceReporter to utilsandCommons.service_synchronization_executor
doc
- ServiceManager