This article focuses on service Porter of NACOS
Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/core/ServiceManager. Java
public class ServiceManager implements RecordListener<Service> {
* Map<namespace, Map<group::serviceName, Service>>
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new ServiceStatusSynchronizer();
private final Lock lock = new ReentrantLock();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
private SwitchDomain switchDomain;
private DistroMapper distroMapper;
private ServerListManager serverListManager;
private PushService pushService;
private final Object putServiceLock = new Object();
public void init() {
UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());
try {"listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen for service meta change failed!"); }} / /... }Copy the code
- The ServiceManager init method registers the ServiceReporter to utilsandCommons.service_synchronization_executor
Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/core/ServiceManager. Java
private class ServiceReporter implements Runnable {
public void run() {
try {
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
for (String serviceName : allServiceNames.get(namespaceId)) {
if(! distroMapper.responsible(serviceName)) {continue;
Service service = getService(namespaceId, serviceName);
if (service == null) {
checksum.addItem(serviceName, service.getChecksum());
Message msg = new Message();
List<Server> sameSiteServers = serverListManager.getServers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
for (Server server : sameSiteServers) {
if (server.getKey().equals(NetUtils.localServer())) {
synchronizer.send(server.getKey(), msg);
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e); } finally { UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS); }}}Copy the code
- ServiceReporter implements the Runnable interface. Its run method iterates through allServiceNames, retrievaldistRomapper. Responsible serviceName, recalculateChecksum, Then add it to the ServiceChecksum, construct Message, iterate over sameSiteServers and send the Message using synchronizer. Send. Finally, re-register ServiceReporter to utilsandCommons.service_synchronization_executor
Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/misc/ServiceStatusSynchronizer Java
public class ServiceStatusSynchronizer implements Synchronizer {
public void send(final String serverIP, Message msg) {
if(serverIP == null) {
Map<String,String> params = new HashMap<String, String>(10);
params.put("statuses", msg.getData());
params.put("clientIP", NetUtils.localServer());
String url = "http://" + serverIP + ":" + RunningConfig.getServerPort() + RunningConfig.getContextPath() +
UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {
url = "http://" + serverIP + RunningConfig.getContextPath() +
UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
try {
HttpClient.asyncHttpPostLarge(url, null, JSON.toJSONString(params), new AsyncCompletionHandler() {
public Integer onCompleted(Response response) throws Exception {
if(response.getStatusCode() ! = HttpURLConnection.HTTP_OK) { Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}", serverIP);
return 1;
return0; }}); } catch (Exception e) { Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
public Message get(String serverIP, String key) {
if(serverIP == null) {
return null;
Map<String,String> params = new HashMap<>(10);
params.put("key", key);
String result;
try {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[STATUS-SYNCHRONIZE] sync service status from: {}, service: {}", serverIP, key);
result = NamingProxy.reqAPI(RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/" + "statuses", params, serverIP);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] Failed to get service status from " + serverIP, e);
return null;
if(result == null || result.equals(StringUtils.EMPTY)) {
return null;
Message msg = new Message();
returnmsg; }}Copy the code
- ServiceStatusSynchronizer implements the Synchronizer interface, its the send method will be executed asynchronously post request, shall notify to the target server statuses
ServiceReporter implements the Runnable interface. Its run method iterates through allServiceNames, retrievaldistRomapper. Responsible serviceName, recalculateChecksum, Then add it to the ServiceChecksum, construct Message, iterate over sameSiteServers and send the Message using synchronizer. Send. Finally, re-register ServiceReporter to utilsandCommons.service_synchronization_executor
- ServiceManager