sequence

This article focuses on service Porter of NACOS

ServiceManager.init

Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/core/ServiceManager. Java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {

    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

    private Synchronizer synchronizer = new ServiceStatusSynchronizer();

    private final Lock lock = new ReentrantLock();

    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private PushService pushService;

    private final Object putServiceLock = new Object();

    @PostConstruct
    public void init() {

        UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

        UtilsAndCommons.SERVICE_UPDATE_EXECUTOR.submit(new UpdatedServiceProcessor());

        try {
            Loggers.SRV_LOG.info("listen for service meta change");
            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
        } catch (NacosException e) {
            Loggers.SRV_LOG.error("listen for service meta change failed!"); }} / /... }Copy the code
  • The ServiceManager init method registers the ServiceReporter to utilsandCommons.service_synchronization_executor

ServiceReporter

Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/core/ServiceManager. Java

    private class ServiceReporter implements Runnable {

        @Override
        public void run() {
            try {

                Map<String, Set<String>> allServiceNames = getAllServiceNames();

                if (allServiceNames.size() <= 0) {
                    //ignore
                    return;
                }

                for (String namespaceId : allServiceNames.keySet()) {

                    ServiceChecksum checksum = new ServiceChecksum(namespaceId);

                    for (String serviceName : allServiceNames.get(namespaceId)) {
                        if(! distroMapper.responsible(serviceName)) {continue;
                        }

                        Service service = getService(namespaceId, serviceName);

                        if (service == null) {
                            continue;
                        }

                        service.recalculateChecksum();

                        checksum.addItem(serviceName, service.getChecksum());
                    }

                    Message msg = new Message();

                    msg.setData(JSON.toJSONString(checksum));

                    List<Server> sameSiteServers = serverListManager.getServers();

                    if (sameSiteServers == null || sameSiteServers.size() <= 0) {
                        return;
                    }

                    for (Server server : sameSiteServers) {
                        if (server.getKey().equals(NetUtils.localServer())) {
                            continue;
                        }
                        synchronizer.send(server.getKey(), msg);
                    }
                }
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e); } finally { UtilsAndCommons.SERVICE_SYNCHRONIZATION_EXECUTOR.schedule(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS); }}}Copy the code
  • ServiceReporter implements the Runnable interface. Its run method iterates through allServiceNames, retrievaldistRomapper. Responsible serviceName, recalculateChecksum, Then add it to the ServiceChecksum, construct Message, iterate over sameSiteServers and send the Message using synchronizer. Send. Finally, re-register ServiceReporter to utilsandCommons.service_synchronization_executor

ServiceStatusSynchronizer

Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/misc/ServiceStatusSynchronizer Java

public class ServiceStatusSynchronizer implements Synchronizer {
    @Override
    public void send(final String serverIP, Message msg) {
        if(serverIP == null) {
            return;
        }

        Map<String,String> params = new HashMap<String, String>(10);

        params.put("statuses", msg.getData());
        params.put("clientIP", NetUtils.localServer());


        String url = "http://" + serverIP + ":" + RunningConfig.getServerPort() + RunningConfig.getContextPath() +
                UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";

        if (serverIP.contains(UtilsAndCommons.IP_PORT_SPLITER)) {
            url = "http://" + serverIP + RunningConfig.getContextPath() +
                    UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
        }

        try {
            HttpClient.asyncHttpPostLarge(url, null, JSON.toJSONString(params), new AsyncCompletionHandler() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    if(response.getStatusCode() ! = HttpURLConnection.HTTP_OK) { Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}", serverIP);

                        return 1;
                    }
                    return0; }}); } catch (Exception e) { Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
        }

    }

    @Override
    public Message get(String serverIP, String key) {
        if(serverIP == null) {
            return null;
        }

        Map<String,String> params = new HashMap<>(10);

        params.put("key", key);

        String result;
        try {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("[STATUS-SYNCHRONIZE] sync service status from: {}, service: {}", serverIP, key);
            }
            result = NamingProxy.reqAPI(RunningConfig.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance/" + "statuses", params, serverIP);
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] Failed to get service status from " + serverIP, e);
            return null;
        }

        if(result == null || result.equals(StringUtils.EMPTY)) {
            return null;
        }

        Message msg = new Message();
        msg.setData(result);

        returnmsg; }}Copy the code
  • ServiceStatusSynchronizer implements the Synchronizer interface, its the send method will be executed asynchronously post request, shall notify to the target server statuses

summary

ServiceReporter implements the Runnable interface. Its run method iterates through allServiceNames, retrievaldistRomapper. Responsible serviceName, recalculateChecksum, Then add it to the ServiceChecksum, construct Message, iterate over sameSiteServers and send the Message using synchronizer. Send. Finally, re-register ServiceReporter to utilsandCommons.service_synchronization_executor

doc

  • ServiceManager